[akka-user] Re: How to split an inbound stream on a delimiter character using Akka Streams
Thanks Viktor. That certainly looks more succinct. My actual use case (this one was intentionally simplified to make the example easier) is that I have messages coming in in protobuf format, each one preceded by a length indicator. So my Transformer basically toggles back and forth between two states; looking for the length of the next message or reading the current message. I got the code working correctly but it's definitely verbose. It's recursive (and tail recursive at that) but it still needs improvement. I want to add sufficient unit testing to it and then I want to start to tighten up the internals of the code. I will use this code here as a guideline. Thanks. On Wednesday, September 3, 2014 8:15:33 AM UTC-4, Chris Baxter wrote: Posted this on Stackoverflow but haven't seen any activity on it so I figured I'd post it here as well. I've been playing around with the experimental Akka Streams API a bit and I have a use case that I wanted to see how to implement. For my use case, I have a `StreamTcp` based `Flow` that is being fed from binding the input stream of connections to my server socket. The Flow that I have is based on `ByteString` data coming into it. The data that is coming in is going to have a delimiter in it that means I should treat everything before the delimiter as one message and everything after and up to the next delimiter as the next message. So playing around with a simpler example, using no sockets and just static text, this is what I came up with: import akka.actor.ActorSystem import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.stream.scaladsl.Flow import scala.util.{ Failure, Success } import akka.util.ByteString object BasicTransformation { def main(args: Array[String]): Unit = { implicit val system = ActorSystem(Sys) val data = ByteString(Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.) Flow(data). splitWhen(c = c == '.'). foreach{producer = Flow(producer). filter(c = c != '.'). fold(new StringBuilder)((sb, c) = sb.append(c.toChar)). map(_.toString). filter(!_.isEmpty). foreach(println(_)). consume(FlowMaterializer(MaterializerSettings())) }. onComplete(FlowMaterializer(MaterializerSettings())) { case any = system.shutdown } } } The main function on the `Flow` that I found to accomplish my goal was `splitWhen`, which then produces additional sub-flows, one for each message per that `.` delimiter. I then process each sub-flow with another pipeline of steps, finally printing the individual messages at the end. This all seems a bit verbose, to accomplish what I thought to be a pretty simple and common use case. So my question is, is there a cleaner and less verbose way to do this or is this the correct and preferred way to split a stream up by a delimiter? The link to the SO question is: http://stackoverflow.com/questions/25631099/how-to-split-an-inbound-stream-on-a-delimiter-character-using-akka-streams -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: How to split an inbound stream on a delimiter character using Akka Streams
Hi Chris, I think we can shrink the code a bit: scala class DotChopper extends Transformer[ByteString, ByteString] { | var buffer: ByteString = ByteString.empty | def chop(find: Byte, input: ByteString, startAt: Int, res: List[ByteString]): (List[ByteString], ByteString) = input.indexOf(find, startAt) match { | case -1 = (res.reverse, input.compact) | case x = | val chunk = input.take(x) | chop(find, input.drop(x + 1), 0, if (chunk.isEmpty) res else chunk :: res) | } | override def onNext(msg: ByteString) = { | val buf = buffer | val (output, rest) = chop('.', buf ++ msg, buf.length, Nil) | buffer = rest | output | } | //override onComplete too if you want to flush the buffer onComplete | } defined class DotChopper scala new DotChopper res21: DotChopper = DotChopper@11939712 scala res21.onNext(ByteString()) res22: List[akka.util.ByteString] = List() scala res21.onNext(ByteString()) res23: List[akka.util.ByteString] = List() scala res21.onNext(ByteString(foo)) res24: List[akka.util.ByteString] = List() scala res21.onNext(ByteString(bar)) res25: List[akka.util.ByteString] = List() scala res21.onNext(ByteString(.)) res26: List[akka.util.ByteString] = List(ByteString(102, 111, 111, 98, 97, 114)) scala res26.map(_.utf8String) res27: List[String] = List(foobar) scala res21.onNext(ByteString(...p..i.g...d.o.g.)) res28: List[akka.util.ByteString] = List(ByteString(112), ByteString(105), ByteString(103), ByteString(100), ByteString(111), ByteString(103)) scala res28.map(_.utf8String) res29: List[String] = List(p, i, g, d, o, g) scala res21.onNext(ByteString()) res30: List[akka.util.ByteString] = List() On Wed, Sep 3, 2014 at 6:50 PM, Chris Baxter cbax...@gmail.com wrote: So transform worked for me. Here is my quick and dirty impl of a Transformer and then using that Transformer in a Flow. Thanks again for the help guys. class PeriodDelimitedTransformer extends Transformer[ByteString,String]{ val buffer = new ByteStringBuilder def onNext(msg:ByteString) = { val msgString = msg.utf8String val delimIndex = msgString.indexOf('.') if (delimIndex == -1){ buffer.append(msg) List.empty } else{ val parts = msgString.split(\\.) val endsWithDelim = msgString.endsWith(.) buffer.putBytes(parts.head.getBytes()) val currentPiece = buffer.result.utf8String val otherPieces = parts.tail.dropRight(1).toList buffer.clear val lastPart = if (endsWithDelim){ List(parts.last) } else{ buffer.putBytes(parts.last.getBytes()) List.empty } val result = currentPiece :: otherPieces ::: lastPart result } } } object BasicTransformation { def main(args: Array[String]): Unit = { implicit val system = ActorSystem(Sys) implicit val mater = FlowMaterializer(MaterializerSettings()) val data = List(ByteString(Lorem Ipsum is), ByteString( simply.Dummy text of.The prin), ByteString(ting.And typesetting industry.)) Flow(data).transform(new PeriodDelimitedTransformer).foreach(println(_)) } } On Wednesday, September 3, 2014 8:15:33 AM UTC-4, Chris Baxter wrote: Posted this on Stackoverflow but haven't seen any activity on it so I figured I'd post it here as well. I've been playing around with the experimental Akka Streams API a bit and I have a use case that I wanted to see how to implement. For my use case, I have a `StreamTcp` based `Flow` that is being fed from binding the input stream of connections to my server socket. The Flow that I have is based on `ByteString` data coming into it. The data that is coming in is going to have a delimiter in it that means I should treat everything before the delimiter as one message and everything after and up to the next delimiter as the next message. So playing around with a simpler example, using no sockets and just static text, this is what I came up with: import akka.actor.ActorSystem import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.stream.scaladsl.Flow import scala.util.{ Failure, Success } import akka.util.ByteString object BasicTransformation { def main(args: Array[String]): Unit = { implicit val system = ActorSystem(Sys) val data = ByteString(Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.) Flow(data). splitWhen(c = c == '.'). foreach{producer = Flow(producer). filter(c = c != '.'). fold(new StringBuilder)((sb, c) = sb.append(c.toChar)). map(_.toString). filter(!_.isEmpty). foreach(println(_)). consume(FlowMaterializer(MaterializerSettings())) }. onComplete(FlowMaterializer(MaterializerSettings())) { case any = system.shutdown
[akka-user] Re: How to split an inbound stream on a delimiter character using Akka Streams
Thanks for the suggestions Viktor and Endre. I will try Viktor's chop solution as well as looking into the Endre's Transformer solution (and the decoding DSL) and then post back with my results. On Wednesday, September 3, 2014 8:15:33 AM UTC-4, Chris Baxter wrote: Posted this on Stackoverflow but haven't seen any activity on it so I figured I'd post it here as well. I've been playing around with the experimental Akka Streams API a bit and I have a use case that I wanted to see how to implement. For my use case, I have a `StreamTcp` based `Flow` that is being fed from binding the input stream of connections to my server socket. The Flow that I have is based on `ByteString` data coming into it. The data that is coming in is going to have a delimiter in it that means I should treat everything before the delimiter as one message and everything after and up to the next delimiter as the next message. So playing around with a simpler example, using no sockets and just static text, this is what I came up with: import akka.actor.ActorSystem import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.stream.scaladsl.Flow import scala.util.{ Failure, Success } import akka.util.ByteString object BasicTransformation { def main(args: Array[String]): Unit = { implicit val system = ActorSystem(Sys) val data = ByteString(Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.) Flow(data). splitWhen(c = c == '.'). foreach{producer = Flow(producer). filter(c = c != '.'). fold(new StringBuilder)((sb, c) = sb.append(c.toChar)). map(_.toString). filter(!_.isEmpty). foreach(println(_)). consume(FlowMaterializer(MaterializerSettings())) }. onComplete(FlowMaterializer(MaterializerSettings())) { case any = system.shutdown } } } The main function on the `Flow` that I found to accomplish my goal was `splitWhen`, which then produces additional sub-flows, one for each message per that `.` delimiter. I then process each sub-flow with another pipeline of steps, finally printing the individual messages at the end. This all seems a bit verbose, to accomplish what I thought to be a pretty simple and common use case. So my question is, is there a cleaner and less verbose way to do this or is this the correct and preferred way to split a stream up by a delimiter? The link to the SO question is: http://stackoverflow.com/questions/25631099/how-to-split-an-inbound-stream-on-a-delimiter-character-using-akka-streams -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: How to split an inbound stream on a delimiter character using Akka Streams
So transform worked for me. Here is my quick and dirty impl of a Transformer and then using that Transformer in a Flow. Thanks again for the help guys. class PeriodDelimitedTransformer extends Transformer[ByteString,String]{ val buffer = new ByteStringBuilder def onNext(msg:ByteString) = { val msgString = msg.utf8String val delimIndex = msgString.indexOf('.') if (delimIndex == -1){ buffer.append(msg) List.empty } else{ val parts = msgString.split(\\.) val endsWithDelim = msgString.endsWith(.) buffer.putBytes(parts.head.getBytes()) val currentPiece = buffer.result.utf8String val otherPieces = parts.tail.dropRight(1).toList buffer.clear val lastPart = if (endsWithDelim){ List(parts.last) } else{ buffer.putBytes(parts.last.getBytes()) List.empty } val result = currentPiece :: otherPieces ::: lastPart result } } } object BasicTransformation { def main(args: Array[String]): Unit = { implicit val system = ActorSystem(Sys) implicit val mater = FlowMaterializer(MaterializerSettings()) val data = List(ByteString(Lorem Ipsum is), ByteString( simply.Dummy text of.The prin), ByteString(ting.And typesetting industry.)) Flow(data).transform(new PeriodDelimitedTransformer).foreach(println(_)) } } On Wednesday, September 3, 2014 8:15:33 AM UTC-4, Chris Baxter wrote: Posted this on Stackoverflow but haven't seen any activity on it so I figured I'd post it here as well. I've been playing around with the experimental Akka Streams API a bit and I have a use case that I wanted to see how to implement. For my use case, I have a `StreamTcp` based `Flow` that is being fed from binding the input stream of connections to my server socket. The Flow that I have is based on `ByteString` data coming into it. The data that is coming in is going to have a delimiter in it that means I should treat everything before the delimiter as one message and everything after and up to the next delimiter as the next message. So playing around with a simpler example, using no sockets and just static text, this is what I came up with: import akka.actor.ActorSystem import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.stream.scaladsl.Flow import scala.util.{ Failure, Success } import akka.util.ByteString object BasicTransformation { def main(args: Array[String]): Unit = { implicit val system = ActorSystem(Sys) val data = ByteString(Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.) Flow(data). splitWhen(c = c == '.'). foreach{producer = Flow(producer). filter(c = c != '.'). fold(new StringBuilder)((sb, c) = sb.append(c.toChar)). map(_.toString). filter(!_.isEmpty). foreach(println(_)). consume(FlowMaterializer(MaterializerSettings())) }. onComplete(FlowMaterializer(MaterializerSettings())) { case any = system.shutdown } } } The main function on the `Flow` that I found to accomplish my goal was `splitWhen`, which then produces additional sub-flows, one for each message per that `.` delimiter. I then process each sub-flow with another pipeline of steps, finally printing the individual messages at the end. This all seems a bit verbose, to accomplish what I thought to be a pretty simple and common use case. So my question is, is there a cleaner and less verbose way to do this or is this the correct and preferred way to split a stream up by a delimiter? The link to the SO question is: http://stackoverflow.com/questions/25631099/how-to-split-an-inbound-stream-on-a-delimiter-character-using-akka-streams -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.