1. Hi, I have a Problem with Framing. I tried to build the ping-pong-example from http://doc.akka.io/docs/akka/2.4.4/java/stream/stream-graphs.html. If I run it over TCP, Messages get lost:
trait Message > case class Ping(id: Int) extends Message > case class Pong(id: Int) extends Message > def toBytes(msg: Message): ByteString = { > implicit val order = ByteOrder.LITTLE_ENDIAN > msg match { > case Ping(id) => > ByteString.newBuilder.putByte(1).putInt(id).result() > case Pong(id) => > ByteString.newBuilder.putByte(2).putInt(id).result() > } > } > def fromBytes(bytes: ByteString): Message = { > implicit val order = ByteOrder.LITTLE_ENDIAN > val it = bytes.iterator > it.getByte match { > case 1 => Ping(it.getInt) > case 2 => Pong(it.getInt) > case other => throw new RuntimeException(s"parse error: expected 1|2 got > $other") > } > } > val codec = BidiFlow.fromFunctions(toBytes _, fromBytes _) > val framing = BidiFlow.fromGraph(create() { b => > implicit val order = ByteOrder.LITTLE_ENDIAN > > def addLengthHeader(bytes: ByteString) = { > val len = bytes.length > ByteString.newBuilder.putInt(len).append(bytes).result() > } > class FrameParser extends GraphStage[FlowShape[ByteString, ByteString]] { > // this holds the received but not yet parsed bytes > val in = Inlet[ByteString]("FrameParser.in") > val out = Outlet[ByteString]("FrameParser.out") > override val shape = FlowShape.of(in, out) > > override def createLogic(inheritedAttributes: Attributes): > GraphStageLogic = new GraphStageLogic(shape) { > > // this holds the received but not yet parsed bytes > var stash = ByteString.empty > // this holds the current message length or -1 if at a boundary > var needed = -1 > > setHandler(out, new OutHandler { > override def onPull(): Unit = { > if (isClosed(in)) run() > else pull(in) > } > }) > setHandler(in, new InHandler { > override def onPush(): Unit = { > val bytes = grab(in) > stash = stash ++ bytes > run() > } > override def onUpstreamFinish(): Unit = { > if (stash.isEmpty) completeStage() > // wait with completion and let run() complete when the > // rest of the stash has been sent downstream > } > }) > private def run(): Unit = { > if (needed == -1) { > // are we at a boundary? then figure out next length > if (stash.length < 4) { > if (isClosed(in)) completeStage() > else pull(in) > } else { > needed = stash.iterator.getInt > stash = stash.drop(4) > run() // cycle back to possibly already emit the next chunk > } > } else if (stash.length < needed) { > // we are in the middle of a message, need more bytes, > // or have to stop if input closed > if (isClosed(in)) completeStage() > else pull(in) > } else { > // we have enough to emit at least one message, so do it > val emit = stash.take(needed) > stash = stash.drop(needed) > needed = -1 > push(out, emit) > } > } > } > } > > val outbound = b.add(Flow[ByteString].map(addLengthHeader)) > val inbound = b.add(Flow[ByteString].via(new FrameParser)) > BidiShape.fromFlows(outbound, inbound) > }) > > val protocol = codec atop framing > > val server = Tcp().bind("127.0.0.1", 0).to(Sink.foreach { > conn => > conn.flow.join(protocol.reversed).join(Flow[Message].map { > case Ping(id) => Pong(id) > case Pong(id) => Ping(id) > case other => println("error"); throw new RuntimeException(s"other: > $other") > }).run() > }).run() > > val myaddr = Await.result(server, 1.second) > // > val client = Tcp().outgoingConnection(myaddr.localAddress) > // > val stack = protocol join client > Source((0 to 20)).map { x => Ping(x) }.via(stack).runForeach(println) If I use following (deprecated )framing instead, all messages get through: > val framing = BidiFlow.fromGraph(create() { b => > implicit val order = ByteOrder.LITTLE_ENDIAN > > def addLengthHeader(bytes: ByteString) = { > val len = bytes.length > ByteString.newBuilder.putInt(len).append(bytes).result() > } > class FrameParser extends PushPullStage[ByteString, ByteString] { > // this holds the received but not yet parsed bytes > var stash = ByteString.empty > // this holds the current message length or -1 if at a boundary > var needed = -1 > > override def onPush(bytes: ByteString, ctx: Context[ByteString]) = { > stash ++= bytes > run(ctx) > } > override def onPull(ctx: Context[ByteString]) = run(ctx) > override def onUpstreamFinish(ctx: Context[ByteString]) = > if (stash.isEmpty) ctx.finish() > else ctx.absorbTermination() // we still have bytes to emit > > private def run(ctx: Context[ByteString]): SyncDirective = > if (needed == -1) { > // are we at a boundary? then figure out next length > if (stash.length < 4) pullOrFinish(ctx) > else { > needed = stash.iterator.getInt > stash = stash.drop(4) > run(ctx) // cycle back to possibly already emit the next chunk > } > } else if (stash.length < needed) { > // we are in the middle of a message, need more bytes > pullOrFinish(ctx) > } else { > // we have enough to emit at least one message, so do it > val emit = stash.take(needed) > stash = stash.drop(needed) > needed = -1 > ctx.push(emit) > } > /* > * After having called absorbTermination() we cannot pull any more, so > if we need > * more data we will just have to give up. > */ > private def pullOrFinish(ctx: Context[ByteString]) = > if (ctx.isFinishing) ctx.finish() > else ctx.pull() > } > val outbound = b.add(Flow[ByteString].map(addLengthHeader)) > val inbound = b.add(Flow[ByteString].transform(() => new FrameParser)) > BidiShape.fromFlows(outbound, inbound) > }) Does anybody see the problem here? I'm quite new with akka, so am I missing something? Thanks, Qux -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.