[akka-user] How to split an inbound stream on a delimiter character using Akka Streams

2014-09-03 Thread Chris Baxter
Posted this on Stackoverflow but haven't seen any activity on it so I 
figured I'd post it here as well.

I've been playing around with the experimental Akka Streams API a bit and I 
have a use case that I wanted to see how to implement.  For my use case, I 
have a `StreamTcp` based `Flow` that is being fed from binding the input 
stream of connections to my server socket.  The Flow that I have is based 
on `ByteString` data coming into it.  The data that is coming in is going 
to have a delimiter in it that means I should treat everything before the 
delimiter as one message and everything after and up to the next delimiter 
as the next message.  So playing around with a simpler example, using no 
sockets and just static text, this is what I came up with:

import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString
 object BasicTransformation {
  def main(args: Array[String]): Unit = {
implicit val system = ActorSystem(Sys)
val data = ByteString(Lorem Ipsum is simply.Dummy text of the 
printing.And typesetting industry.)
Flow(data).
  splitWhen(c = c == '.').
  foreach{producer = 
Flow(producer).
  filter(c = c != '.').
  fold(new StringBuilder)((sb, c) = sb.append(c.toChar)).
  map(_.toString).
  filter(!_.isEmpty).
  foreach(println(_)).
  consume(FlowMaterializer(MaterializerSettings()))
  }.
  onComplete(FlowMaterializer(MaterializerSettings())) {
case any =
  system.shutdown
  }
  }
}

The main function on the `Flow` that I found to accomplish my goal was 
`splitWhen`, which then produces additional sub-flows, one for each message 
per that `.` delimiter.  I then process each sub-flow with another pipeline 
of steps, finally printing the individual messages at the end.

This all seems a bit verbose, to accomplish what I thought to be a pretty 
simple and common use case.  So my question is, is there a cleaner and less 
verbose way to do this or is this the correct and preferred way to split a 
stream up by a delimiter?

The link to the SO question 
is: 
http://stackoverflow.com/questions/25631099/how-to-split-an-inbound-stream-on-a-delimiter-character-using-akka-streams

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to split an inbound stream on a delimiter character using Akka Streams

2014-09-03 Thread Endre Varga
Hi Chris,

Casting the ByteString to a stream of Bytes is very ineffective. I would go
with a stream of ByteStrings (chunks) instead. In these cases a Transformer
step is the best option, buffering up chunks as much as it is needed and
then emitting as much lines as you parsed.

Btw, there is a dormant and somewhat stale branch (that will be revived in
the future), which contains a decoding DSL for similar tasks like you have:
https://github.com/drewhk/akka/commit/e97cbe186ce0a8298695bedeac7ba6bf59df89bf

You can look at the Test in that PR to see what kind of decoding will be
possible. It is not official code right now, and the API is a bit confusing
first, but at least shows the direction.

-Endre


On Wed, Sep 3, 2014 at 2:15 PM, Chris Baxter cbax...@gmail.com wrote:

 Posted this on Stackoverflow but haven't seen any activity on it so I
 figured I'd post it here as well.

 I've been playing around with the experimental Akka Streams API a bit and
 I have a use case that I wanted to see how to implement.  For my use case,
 I have a `StreamTcp` based `Flow` that is being fed from binding the input
 stream of connections to my server socket.  The Flow that I have is based
 on `ByteString` data coming into it.  The data that is coming in is going
 to have a delimiter in it that means I should treat everything before the
 delimiter as one message and everything after and up to the next delimiter
 as the next message.  So playing around with a simpler example, using no
 sockets and just static text, this is what I came up with:

 import akka.actor.ActorSystem
 import akka.stream.{ FlowMaterializer, MaterializerSettings }
 import akka.stream.scaladsl.Flow
 import scala.util.{ Failure, Success }
 import akka.util.ByteString
  object BasicTransformation {
   def main(args: Array[String]): Unit = {
 implicit val system = ActorSystem(Sys)
 val data = ByteString(Lorem Ipsum is simply.Dummy text of the
 printing.And typesetting industry.)
 Flow(data).
   splitWhen(c = c == '.').
   foreach{producer =
 Flow(producer).
   filter(c = c != '.').
   fold(new StringBuilder)((sb, c) = sb.append(c.toChar)).
   map(_.toString).
   filter(!_.isEmpty).
   foreach(println(_)).
   consume(FlowMaterializer(MaterializerSettings()))
   }.
   onComplete(FlowMaterializer(MaterializerSettings())) {
 case any =
   system.shutdown
   }
   }
 }

 The main function on the `Flow` that I found to accomplish my goal was
 `splitWhen`, which then produces additional sub-flows, one for each message
 per that `.` delimiter.  I then process each sub-flow with another pipeline
 of steps, finally printing the individual messages at the end.

 This all seems a bit verbose, to accomplish what I thought to be a pretty
 simple and common use case.  So my question is, is there a cleaner and less
 verbose way to do this or is this the correct and preferred way to split a
 stream up by a delimiter?

 The link to the SO question is:
 http://stackoverflow.com/questions/25631099/how-to-split-an-inbound-stream-on-a-delimiter-character-using-akka-streams

 --
  Read the docs: http://akka.io/docs/
  Check the FAQ:
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 ---
 You received this message because you are subscribed to the Google Groups
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an
 email to akka-user+unsubscr...@googlegroups.com.
 To post to this group, send email to akka-user@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to split an inbound stream on a delimiter character using Akka Streams

2014-09-03 Thread √iktor Ҡlang
def chop(find: Byte, in: ByteString, res: Seq[ByteString] = Nil):
Seq[ByteString] = in.indexOf(find) match {
  case -1 = res
  case x  =
val chunk = in.take(x)
chop(find, in.drop(x + 1), if (chunk.isEmpty) res else res :+ chunk)
}

scala chop('.', ByteString())
res10: Seq[akka.util.ByteString] = List()

scala chop('.', ByteString())
res11: Seq[akka.util.ByteString] = List()

scala chop('.', ByteString(Lorem Ipsum is simply.Dummy text of the
printing.And typesetting industry.)).map(_.utf8String)
res12: Seq[String] = List(Lorem Ipsum is simply, Dummy text of the
printing, And typesetting industry)

Flow(data).mapConcat(bs = chop('.', bs)).etc


On Wed, Sep 3, 2014 at 2:15 PM, Chris Baxter cbax...@gmail.com wrote:

 Posted this on Stackoverflow but haven't seen any activity on it so I
 figured I'd post it here as well.

 I've been playing around with the experimental Akka Streams API a bit and
 I have a use case that I wanted to see how to implement.  For my use case,
 I have a `StreamTcp` based `Flow` that is being fed from binding the input
 stream of connections to my server socket.  The Flow that I have is based
 on `ByteString` data coming into it.  The data that is coming in is going
 to have a delimiter in it that means I should treat everything before the
 delimiter as one message and everything after and up to the next delimiter
 as the next message.  So playing around with a simpler example, using no
 sockets and just static text, this is what I came up with:

 import akka.actor.ActorSystem
 import akka.stream.{ FlowMaterializer, MaterializerSettings }
 import akka.stream.scaladsl.Flow
  import scala.util.{ Failure, Success }
 import akka.util.ByteString
  object BasicTransformation {
   def main(args: Array[String]): Unit = {
 implicit val system = ActorSystem(Sys)
 val data = ByteString(Lorem Ipsum is simply.Dummy text of the
 printing.And typesetting industry.)
  Flow(data).
   splitWhen(c = c == '.').
   foreach{producer =
 Flow(producer).
   filter(c = c != '.').
   fold(new StringBuilder)((sb, c) = sb.append(c.toChar)).
   map(_.toString).
   filter(!_.isEmpty).
   foreach(println(_)).
   consume(FlowMaterializer(MaterializerSettings()))
   }.
   onComplete(FlowMaterializer(MaterializerSettings())) {
 case any =
   system.shutdown
   }
   }
 }

 The main function on the `Flow` that I found to accomplish my goal was
 `splitWhen`, which then produces additional sub-flows, one for each message
 per that `.` delimiter.  I then process each sub-flow with another pipeline
 of steps, finally printing the individual messages at the end.

 This all seems a bit verbose, to accomplish what I thought to be a pretty
 simple and common use case.  So my question is, is there a cleaner and less
 verbose way to do this or is this the correct and preferred way to split a
 stream up by a delimiter?

 The link to the SO question is:
 http://stackoverflow.com/questions/25631099/how-to-split-an-inbound-stream-on-a-delimiter-character-using-akka-streams

 --
  Read the docs: http://akka.io/docs/
  Check the FAQ:
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 ---
 You received this message because you are subscribed to the Google Groups
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an
 email to akka-user+unsubscr...@googlegroups.com.
 To post to this group, send email to akka-user@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




-- 
Cheers,
√

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to split an inbound stream on a delimiter character using Akka Streams

2014-09-03 Thread Chris Baxter
My bad ;)

Okay, I will proceed with a Transformer approach.  Thanks for the push in 
the right direction.

On Wednesday, September 3, 2014 10:04:30 AM UTC-4, √ wrote:

 That was not the thing you asked for, you had a single ByteString. :)

 If you want to do that then you need to create a Transformer and use the 
 `transform` method.


 On Wed, Sep 3, 2014 at 3:55 PM, Chris Baxter cba...@gmail.com 
 javascript: wrote:

 Viktor, how would this work if the delimiter was not in the current 
 ByteString, meaning that it's coming in a subsequent ByteString and I need 
 to buffer this ByteString until the next one comes through?


 On Wednesday, September 3, 2014 8:42:46 AM UTC-4, √ wrote:

 def chop(find: Byte, in: ByteString, res: Seq[ByteString] = Nil): 
 Seq[ByteString] = in.indexOf(find) match {
   case -1 = res
   case x  = 
 val chunk = in.take(x)
 chop(find, in.drop(x + 1), if (chunk.isEmpty) res else res :+ chunk)
 }

 scala chop('.', ByteString())
 res10: Seq[akka.util.ByteString] = List()

 scala chop('.', ByteString())
 res11: Seq[akka.util.ByteString] = List()

 scala chop('.', ByteString(Lorem Ipsum is simply.Dummy text of the 
 printing.And typesetting industry.)).map(_.utf8String)
 res12: Seq[String] = List(Lorem Ipsum is simply, Dummy text of the 
 printing, And typesetting industry)

 Flow(data).mapConcat(bs = chop('.', bs)).etc


  On Wed, Sep 3, 2014 at 2:15 PM, Chris Baxter cba...@gmail.com wrote:

 Posted this on Stackoverflow but haven't seen any activity on it so I 
 figured I'd post it here as well.

 I've been playing around with the experimental Akka Streams API a bit 
 and I have a use case that I wanted to see how to implement.  For my use 
 case, I have a `StreamTcp` based `Flow` that is being fed from binding the 
 input stream of connections to my server socket.  The Flow that I have is 
 based on `ByteString` data coming into it.  The data that is coming in is 
 going to have a delimiter in it that means I should treat everything 
 before 
 the delimiter as one message and everything after and up to the next 
 delimiter as the next message.  So playing around with a simpler example, 
 using no sockets and just static text, this is what I came up with:

 import akka.actor.ActorSystem
 import akka.stream.{ FlowMaterializer, MaterializerSettings }
 import akka.stream.scaladsl.Flow
  import scala.util.{ Failure, Success }
 import akka.util.ByteString
  object BasicTransformation {
   def main(args: Array[String]): Unit = {
 implicit val system = ActorSystem(Sys)
 val data = ByteString(Lorem Ipsum is simply.Dummy text of the 
 printing.And typesetting industry.)
  Flow(data).
   splitWhen(c = c == '.').
   foreach{producer = 
 Flow(producer).
   filter(c = c != '.').
   fold(new StringBuilder)((sb, c) = sb.append(c.toChar)).
   map(_.toString).
   filter(!_.isEmpty).
   foreach(println(_)).
   consume(FlowMaterializer(MaterializerSettings()))
   }.
   onComplete(FlowMaterializer(MaterializerSettings())) {
 case any =
   system.shutdown
   }
   }
 }

 The main function on the `Flow` that I found to accomplish my goal was 
 `splitWhen`, which then produces additional sub-flows, one for each 
 message 
 per that `.` delimiter.  I then process each sub-flow with another 
 pipeline 
 of steps, finally printing the individual messages at the end.

 This all seems a bit verbose, to accomplish what I thought to be a 
 pretty simple and common use case.  So my question is, is there a cleaner 
 and less verbose way to do this or is this the correct and preferred way 
 to 
 split a stream up by a delimiter?

 The link to the SO question is: http://stackoverflow.com/
 questions/25631099/how-to-split-an-inbound-stream-on-a-
 delimiter-character-using-akka-streams
  
 -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: http://doc.akka.io/docs/akka/
 current/additional/faq.html
  Search the archives: https://groups.google.com/
 group/akka-user
 --- 
 You received this message because you are subscribed to the Google 
 Groups Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send 
 an email to akka-user+...@googlegroups.com.
 To post to this group, send email to akka...@googlegroups.com.

 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 
 Cheers,
 √
  
  -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google Groups 
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group