[akka-user] How to split an inbound stream on a delimiter character using Akka Streams
Posted this on Stackoverflow but haven't seen any activity on it so I figured I'd post it here as well. I've been playing around with the experimental Akka Streams API a bit and I have a use case that I wanted to see how to implement. For my use case, I have a `StreamTcp` based `Flow` that is being fed from binding the input stream of connections to my server socket. The Flow that I have is based on `ByteString` data coming into it. The data that is coming in is going to have a delimiter in it that means I should treat everything before the delimiter as one message and everything after and up to the next delimiter as the next message. So playing around with a simpler example, using no sockets and just static text, this is what I came up with: import akka.actor.ActorSystem import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.stream.scaladsl.Flow import scala.util.{ Failure, Success } import akka.util.ByteString object BasicTransformation { def main(args: Array[String]): Unit = { implicit val system = ActorSystem(Sys) val data = ByteString(Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.) Flow(data). splitWhen(c = c == '.'). foreach{producer = Flow(producer). filter(c = c != '.'). fold(new StringBuilder)((sb, c) = sb.append(c.toChar)). map(_.toString). filter(!_.isEmpty). foreach(println(_)). consume(FlowMaterializer(MaterializerSettings())) }. onComplete(FlowMaterializer(MaterializerSettings())) { case any = system.shutdown } } } The main function on the `Flow` that I found to accomplish my goal was `splitWhen`, which then produces additional sub-flows, one for each message per that `.` delimiter. I then process each sub-flow with another pipeline of steps, finally printing the individual messages at the end. This all seems a bit verbose, to accomplish what I thought to be a pretty simple and common use case. So my question is, is there a cleaner and less verbose way to do this or is this the correct and preferred way to split a stream up by a delimiter? The link to the SO question is: http://stackoverflow.com/questions/25631099/how-to-split-an-inbound-stream-on-a-delimiter-character-using-akka-streams -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] How to split an inbound stream on a delimiter character using Akka Streams
Hi Chris, Casting the ByteString to a stream of Bytes is very ineffective. I would go with a stream of ByteStrings (chunks) instead. In these cases a Transformer step is the best option, buffering up chunks as much as it is needed and then emitting as much lines as you parsed. Btw, there is a dormant and somewhat stale branch (that will be revived in the future), which contains a decoding DSL for similar tasks like you have: https://github.com/drewhk/akka/commit/e97cbe186ce0a8298695bedeac7ba6bf59df89bf You can look at the Test in that PR to see what kind of decoding will be possible. It is not official code right now, and the API is a bit confusing first, but at least shows the direction. -Endre On Wed, Sep 3, 2014 at 2:15 PM, Chris Baxter cbax...@gmail.com wrote: Posted this on Stackoverflow but haven't seen any activity on it so I figured I'd post it here as well. I've been playing around with the experimental Akka Streams API a bit and I have a use case that I wanted to see how to implement. For my use case, I have a `StreamTcp` based `Flow` that is being fed from binding the input stream of connections to my server socket. The Flow that I have is based on `ByteString` data coming into it. The data that is coming in is going to have a delimiter in it that means I should treat everything before the delimiter as one message and everything after and up to the next delimiter as the next message. So playing around with a simpler example, using no sockets and just static text, this is what I came up with: import akka.actor.ActorSystem import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.stream.scaladsl.Flow import scala.util.{ Failure, Success } import akka.util.ByteString object BasicTransformation { def main(args: Array[String]): Unit = { implicit val system = ActorSystem(Sys) val data = ByteString(Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.) Flow(data). splitWhen(c = c == '.'). foreach{producer = Flow(producer). filter(c = c != '.'). fold(new StringBuilder)((sb, c) = sb.append(c.toChar)). map(_.toString). filter(!_.isEmpty). foreach(println(_)). consume(FlowMaterializer(MaterializerSettings())) }. onComplete(FlowMaterializer(MaterializerSettings())) { case any = system.shutdown } } } The main function on the `Flow` that I found to accomplish my goal was `splitWhen`, which then produces additional sub-flows, one for each message per that `.` delimiter. I then process each sub-flow with another pipeline of steps, finally printing the individual messages at the end. This all seems a bit verbose, to accomplish what I thought to be a pretty simple and common use case. So my question is, is there a cleaner and less verbose way to do this or is this the correct and preferred way to split a stream up by a delimiter? The link to the SO question is: http://stackoverflow.com/questions/25631099/how-to-split-an-inbound-stream-on-a-delimiter-character-using-akka-streams -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] How to split an inbound stream on a delimiter character using Akka Streams
def chop(find: Byte, in: ByteString, res: Seq[ByteString] = Nil): Seq[ByteString] = in.indexOf(find) match { case -1 = res case x = val chunk = in.take(x) chop(find, in.drop(x + 1), if (chunk.isEmpty) res else res :+ chunk) } scala chop('.', ByteString()) res10: Seq[akka.util.ByteString] = List() scala chop('.', ByteString()) res11: Seq[akka.util.ByteString] = List() scala chop('.', ByteString(Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.)).map(_.utf8String) res12: Seq[String] = List(Lorem Ipsum is simply, Dummy text of the printing, And typesetting industry) Flow(data).mapConcat(bs = chop('.', bs)).etc On Wed, Sep 3, 2014 at 2:15 PM, Chris Baxter cbax...@gmail.com wrote: Posted this on Stackoverflow but haven't seen any activity on it so I figured I'd post it here as well. I've been playing around with the experimental Akka Streams API a bit and I have a use case that I wanted to see how to implement. For my use case, I have a `StreamTcp` based `Flow` that is being fed from binding the input stream of connections to my server socket. The Flow that I have is based on `ByteString` data coming into it. The data that is coming in is going to have a delimiter in it that means I should treat everything before the delimiter as one message and everything after and up to the next delimiter as the next message. So playing around with a simpler example, using no sockets and just static text, this is what I came up with: import akka.actor.ActorSystem import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.stream.scaladsl.Flow import scala.util.{ Failure, Success } import akka.util.ByteString object BasicTransformation { def main(args: Array[String]): Unit = { implicit val system = ActorSystem(Sys) val data = ByteString(Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.) Flow(data). splitWhen(c = c == '.'). foreach{producer = Flow(producer). filter(c = c != '.'). fold(new StringBuilder)((sb, c) = sb.append(c.toChar)). map(_.toString). filter(!_.isEmpty). foreach(println(_)). consume(FlowMaterializer(MaterializerSettings())) }. onComplete(FlowMaterializer(MaterializerSettings())) { case any = system.shutdown } } } The main function on the `Flow` that I found to accomplish my goal was `splitWhen`, which then produces additional sub-flows, one for each message per that `.` delimiter. I then process each sub-flow with another pipeline of steps, finally printing the individual messages at the end. This all seems a bit verbose, to accomplish what I thought to be a pretty simple and common use case. So my question is, is there a cleaner and less verbose way to do this or is this the correct and preferred way to split a stream up by a delimiter? The link to the SO question is: http://stackoverflow.com/questions/25631099/how-to-split-an-inbound-stream-on-a-delimiter-character-using-akka-streams -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] How to split an inbound stream on a delimiter character using Akka Streams
My bad ;) Okay, I will proceed with a Transformer approach. Thanks for the push in the right direction. On Wednesday, September 3, 2014 10:04:30 AM UTC-4, √ wrote: That was not the thing you asked for, you had a single ByteString. :) If you want to do that then you need to create a Transformer and use the `transform` method. On Wed, Sep 3, 2014 at 3:55 PM, Chris Baxter cba...@gmail.com javascript: wrote: Viktor, how would this work if the delimiter was not in the current ByteString, meaning that it's coming in a subsequent ByteString and I need to buffer this ByteString until the next one comes through? On Wednesday, September 3, 2014 8:42:46 AM UTC-4, √ wrote: def chop(find: Byte, in: ByteString, res: Seq[ByteString] = Nil): Seq[ByteString] = in.indexOf(find) match { case -1 = res case x = val chunk = in.take(x) chop(find, in.drop(x + 1), if (chunk.isEmpty) res else res :+ chunk) } scala chop('.', ByteString()) res10: Seq[akka.util.ByteString] = List() scala chop('.', ByteString()) res11: Seq[akka.util.ByteString] = List() scala chop('.', ByteString(Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.)).map(_.utf8String) res12: Seq[String] = List(Lorem Ipsum is simply, Dummy text of the printing, And typesetting industry) Flow(data).mapConcat(bs = chop('.', bs)).etc On Wed, Sep 3, 2014 at 2:15 PM, Chris Baxter cba...@gmail.com wrote: Posted this on Stackoverflow but haven't seen any activity on it so I figured I'd post it here as well. I've been playing around with the experimental Akka Streams API a bit and I have a use case that I wanted to see how to implement. For my use case, I have a `StreamTcp` based `Flow` that is being fed from binding the input stream of connections to my server socket. The Flow that I have is based on `ByteString` data coming into it. The data that is coming in is going to have a delimiter in it that means I should treat everything before the delimiter as one message and everything after and up to the next delimiter as the next message. So playing around with a simpler example, using no sockets and just static text, this is what I came up with: import akka.actor.ActorSystem import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.stream.scaladsl.Flow import scala.util.{ Failure, Success } import akka.util.ByteString object BasicTransformation { def main(args: Array[String]): Unit = { implicit val system = ActorSystem(Sys) val data = ByteString(Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.) Flow(data). splitWhen(c = c == '.'). foreach{producer = Flow(producer). filter(c = c != '.'). fold(new StringBuilder)((sb, c) = sb.append(c.toChar)). map(_.toString). filter(!_.isEmpty). foreach(println(_)). consume(FlowMaterializer(MaterializerSettings())) }. onComplete(FlowMaterializer(MaterializerSettings())) { case any = system.shutdown } } } The main function on the `Flow` that I found to accomplish my goal was `splitWhen`, which then produces additional sub-flows, one for each message per that `.` delimiter. I then process each sub-flow with another pipeline of steps, finally printing the individual messages at the end. This all seems a bit verbose, to accomplish what I thought to be a pretty simple and common use case. So my question is, is there a cleaner and less verbose way to do this or is this the correct and preferred way to split a stream up by a delimiter? The link to the SO question is: http://stackoverflow.com/ questions/25631099/how-to-split-an-inbound-stream-on-a- delimiter-character-using-akka-streams -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group