[akka-user] Re: How to split an inbound stream on a delimiter character using Akka Streams

2014-09-05 Thread Chris Baxter
Thanks Viktor.  That certainly looks more succinct.  My actual use case 
(this one was intentionally simplified to make the example easier) is that 
I have messages coming in in protobuf format, each one preceded by a length 
indicator.  So my Transformer basically toggles back and forth between two 
states; looking for the length of the next message or reading the current 
message.  I got the code working correctly but it's definitely verbose. 
 It's recursive (and tail recursive at that) but it still needs 
improvement.  I want to add sufficient unit testing to it and then I want 
to start to tighten up the internals of the code.  I will use this code 
here as a guideline.  Thanks.

On Wednesday, September 3, 2014 8:15:33 AM UTC-4, Chris Baxter wrote:

 Posted this on Stackoverflow but haven't seen any activity on it so I 
 figured I'd post it here as well.

 I've been playing around with the experimental Akka Streams API a bit and 
 I have a use case that I wanted to see how to implement.  For my use case, 
 I have a `StreamTcp` based `Flow` that is being fed from binding the input 
 stream of connections to my server socket.  The Flow that I have is based 
 on `ByteString` data coming into it.  The data that is coming in is going 
 to have a delimiter in it that means I should treat everything before the 
 delimiter as one message and everything after and up to the next delimiter 
 as the next message.  So playing around with a simpler example, using no 
 sockets and just static text, this is what I came up with:

 import akka.actor.ActorSystem
 import akka.stream.{ FlowMaterializer, MaterializerSettings }
 import akka.stream.scaladsl.Flow
 import scala.util.{ Failure, Success }
 import akka.util.ByteString
  object BasicTransformation {
   def main(args: Array[String]): Unit = {
 implicit val system = ActorSystem(Sys)
 val data = ByteString(Lorem Ipsum is simply.Dummy text of the 
 printing.And typesetting industry.)
 Flow(data).
   splitWhen(c = c == '.').
   foreach{producer = 
 Flow(producer).
   filter(c = c != '.').
   fold(new StringBuilder)((sb, c) = sb.append(c.toChar)).
   map(_.toString).
   filter(!_.isEmpty).
   foreach(println(_)).
   consume(FlowMaterializer(MaterializerSettings()))
   }.
   onComplete(FlowMaterializer(MaterializerSettings())) {
 case any =
   system.shutdown
   }
   }
 }

 The main function on the `Flow` that I found to accomplish my goal was 
 `splitWhen`, which then produces additional sub-flows, one for each message 
 per that `.` delimiter.  I then process each sub-flow with another pipeline 
 of steps, finally printing the individual messages at the end.

 This all seems a bit verbose, to accomplish what I thought to be a pretty 
 simple and common use case.  So my question is, is there a cleaner and less 
 verbose way to do this or is this the correct and preferred way to split a 
 stream up by a delimiter?

 The link to the SO question is: 
 http://stackoverflow.com/questions/25631099/how-to-split-an-inbound-stream-on-a-delimiter-character-using-akka-streams


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: How to split an inbound stream on a delimiter character using Akka Streams

2014-09-04 Thread √iktor Ҡlang
Hi Chris,

I think we can shrink the code a bit:

scala class DotChopper extends Transformer[ByteString, ByteString] {
 |   var buffer: ByteString = ByteString.empty
 |   def chop(find: Byte, input: ByteString, startAt: Int, res:
List[ByteString]): (List[ByteString], ByteString) = input.indexOf(find,
startAt) match {
 | case -1 = (res.reverse, input.compact)
 | case x  =
 |   val chunk = input.take(x)
 |   chop(find, input.drop(x + 1), 0, if (chunk.isEmpty) res else
chunk :: res)
 |   }
 |   override def onNext(msg: ByteString) = {
 | val buf = buffer
 | val (output, rest) = chop('.', buf ++ msg, buf.length, Nil)
 | buffer = rest
 | output
 |   }
 |  //override onComplete too if you want to flush the buffer onComplete
 | }
defined class DotChopper

scala new DotChopper
res21: DotChopper = DotChopper@11939712

scala res21.onNext(ByteString())
res22: List[akka.util.ByteString] = List()

scala res21.onNext(ByteString())
res23: List[akka.util.ByteString] = List()

scala res21.onNext(ByteString(foo))
res24: List[akka.util.ByteString] = List()

scala res21.onNext(ByteString(bar))
res25: List[akka.util.ByteString] = List()

scala res21.onNext(ByteString(.))
res26: List[akka.util.ByteString] = List(ByteString(102, 111, 111, 98, 97,
114))

scala res26.map(_.utf8String)
res27: List[String] = List(foobar)

scala res21.onNext(ByteString(...p..i.g...d.o.g.))
res28: List[akka.util.ByteString] = List(ByteString(112), ByteString(105),
ByteString(103), ByteString(100), ByteString(111), ByteString(103))

scala res28.map(_.utf8String)
res29: List[String] = List(p, i, g, d, o, g)

scala res21.onNext(ByteString())
res30: List[akka.util.ByteString] = List()


On Wed, Sep 3, 2014 at 6:50 PM, Chris Baxter cbax...@gmail.com wrote:

 So transform worked for me.  Here is my quick and dirty impl of a
 Transformer and then using that Transformer in a Flow.  Thanks again for
 the help guys.

 class PeriodDelimitedTransformer extends
 Transformer[ByteString,String]{
   val buffer = new ByteStringBuilder

   def onNext(msg:ByteString) = {
 val msgString = msg.utf8String
 val delimIndex = msgString.indexOf('.')
 if (delimIndex == -1){
   buffer.append(msg)
   List.empty
 }
 else{
   val parts = msgString.split(\\.)
   val endsWithDelim = msgString.endsWith(.)

   buffer.putBytes(parts.head.getBytes())
   val currentPiece = buffer.result.utf8String
   val otherPieces = parts.tail.dropRight(1).toList

   buffer.clear
   val lastPart =
   if (endsWithDelim){
   List(parts.last)
   }
   else{
   buffer.putBytes(parts.last.getBytes())
   List.empty
   }


   val result = currentPiece :: otherPieces ::: lastPart
   result
 }

   }
 }
  object BasicTransformation {
   def main(args: Array[String]): Unit = {
 implicit val system = ActorSystem(Sys)
 implicit val mater = FlowMaterializer(MaterializerSettings())

 val data = List(ByteString(Lorem Ipsum is), ByteString(
 simply.Dummy text of.The prin), ByteString(ting.And typesetting
 industry.))
 Flow(data).transform(new
 PeriodDelimitedTransformer).foreach(println(_))
   }
 }

 On Wednesday, September 3, 2014 8:15:33 AM UTC-4, Chris Baxter wrote:

 Posted this on Stackoverflow but haven't seen any activity on it so I
 figured I'd post it here as well.

 I've been playing around with the experimental Akka Streams API a bit and
 I have a use case that I wanted to see how to implement.  For my use case,
 I have a `StreamTcp` based `Flow` that is being fed from binding the input
 stream of connections to my server socket.  The Flow that I have is based
 on `ByteString` data coming into it.  The data that is coming in is going
 to have a delimiter in it that means I should treat everything before the
 delimiter as one message and everything after and up to the next delimiter
 as the next message.  So playing around with a simpler example, using no
 sockets and just static text, this is what I came up with:

 import akka.actor.ActorSystem
 import akka.stream.{ FlowMaterializer, MaterializerSettings }
 import akka.stream.scaladsl.Flow
 import scala.util.{ Failure, Success }
 import akka.util.ByteString
  object BasicTransformation {
   def main(args: Array[String]): Unit = {
 implicit val system = ActorSystem(Sys)
 val data = ByteString(Lorem Ipsum is simply.Dummy text of the
 printing.And typesetting industry.)
 Flow(data).
   splitWhen(c = c == '.').
   foreach{producer =
 Flow(producer).
   filter(c = c != '.').
   fold(new StringBuilder)((sb, c) = sb.append(c.toChar)).
   map(_.toString).
   filter(!_.isEmpty).
   foreach(println(_)).
   consume(FlowMaterializer(MaterializerSettings()))
   }.
   onComplete(FlowMaterializer(MaterializerSettings())) {
 case any =
   system.shutdown
   

[akka-user] Re: How to split an inbound stream on a delimiter character using Akka Streams

2014-09-03 Thread Chris Baxter
Thanks for the suggestions Viktor and Endre.  I will try Viktor's chop 
solution as well as looking into the Endre's Transformer solution (and the 
decoding DSL) and then post back with my results.

On Wednesday, September 3, 2014 8:15:33 AM UTC-4, Chris Baxter wrote:

 Posted this on Stackoverflow but haven't seen any activity on it so I 
 figured I'd post it here as well.

 I've been playing around with the experimental Akka Streams API a bit and 
 I have a use case that I wanted to see how to implement.  For my use case, 
 I have a `StreamTcp` based `Flow` that is being fed from binding the input 
 stream of connections to my server socket.  The Flow that I have is based 
 on `ByteString` data coming into it.  The data that is coming in is going 
 to have a delimiter in it that means I should treat everything before the 
 delimiter as one message and everything after and up to the next delimiter 
 as the next message.  So playing around with a simpler example, using no 
 sockets and just static text, this is what I came up with:

 import akka.actor.ActorSystem
 import akka.stream.{ FlowMaterializer, MaterializerSettings }
 import akka.stream.scaladsl.Flow
 import scala.util.{ Failure, Success }
 import akka.util.ByteString
  object BasicTransformation {
   def main(args: Array[String]): Unit = {
 implicit val system = ActorSystem(Sys)
 val data = ByteString(Lorem Ipsum is simply.Dummy text of the 
 printing.And typesetting industry.)
 Flow(data).
   splitWhen(c = c == '.').
   foreach{producer = 
 Flow(producer).
   filter(c = c != '.').
   fold(new StringBuilder)((sb, c) = sb.append(c.toChar)).
   map(_.toString).
   filter(!_.isEmpty).
   foreach(println(_)).
   consume(FlowMaterializer(MaterializerSettings()))
   }.
   onComplete(FlowMaterializer(MaterializerSettings())) {
 case any =
   system.shutdown
   }
   }
 }

 The main function on the `Flow` that I found to accomplish my goal was 
 `splitWhen`, which then produces additional sub-flows, one for each message 
 per that `.` delimiter.  I then process each sub-flow with another pipeline 
 of steps, finally printing the individual messages at the end.

 This all seems a bit verbose, to accomplish what I thought to be a pretty 
 simple and common use case.  So my question is, is there a cleaner and less 
 verbose way to do this or is this the correct and preferred way to split a 
 stream up by a delimiter?

 The link to the SO question is: 
 http://stackoverflow.com/questions/25631099/how-to-split-an-inbound-stream-on-a-delimiter-character-using-akka-streams


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: How to split an inbound stream on a delimiter character using Akka Streams

2014-09-03 Thread Chris Baxter
So transform worked for me.  Here is my quick and dirty impl of a 
Transformer and then using that Transformer in a Flow.  Thanks again for 
the help guys.

class PeriodDelimitedTransformer extends Transformer[ByteString,String]{
  val buffer = new ByteStringBuilder
  
  def onNext(msg:ByteString) = { 
val msgString = msg.utf8String
val delimIndex = msgString.indexOf('.')
if (delimIndex == -1){
  buffer.append(msg)
  List.empty
}
else{
  val parts = msgString.split(\\.)
  val endsWithDelim = msgString.endsWith(.)
  
  buffer.putBytes(parts.head.getBytes())
  val currentPiece = buffer.result.utf8String 
  val otherPieces = parts.tail.dropRight(1).toList

  buffer.clear
  val lastPart = 
  if (endsWithDelim){
  List(parts.last)
  }
  else{
  buffer.putBytes(parts.last.getBytes())
  List.empty
  }  
  
  
  val result = currentPiece :: otherPieces ::: lastPart
  result
}

  }  
}
 object BasicTransformation {
  def main(args: Array[String]): Unit = {
implicit val system = ActorSystem(Sys)   
implicit val mater = FlowMaterializer(MaterializerSettings())

val data = List(ByteString(Lorem Ipsum is), ByteString( simply.Dummy 
text of.The prin), ByteString(ting.And typesetting industry.))
Flow(data).transform(new PeriodDelimitedTransformer).foreach(println(_))
  }
}

On Wednesday, September 3, 2014 8:15:33 AM UTC-4, Chris Baxter wrote:

 Posted this on Stackoverflow but haven't seen any activity on it so I 
 figured I'd post it here as well.

 I've been playing around with the experimental Akka Streams API a bit and 
 I have a use case that I wanted to see how to implement.  For my use case, 
 I have a `StreamTcp` based `Flow` that is being fed from binding the input 
 stream of connections to my server socket.  The Flow that I have is based 
 on `ByteString` data coming into it.  The data that is coming in is going 
 to have a delimiter in it that means I should treat everything before the 
 delimiter as one message and everything after and up to the next delimiter 
 as the next message.  So playing around with a simpler example, using no 
 sockets and just static text, this is what I came up with:

 import akka.actor.ActorSystem
 import akka.stream.{ FlowMaterializer, MaterializerSettings }
 import akka.stream.scaladsl.Flow
 import scala.util.{ Failure, Success }
 import akka.util.ByteString
  object BasicTransformation {
   def main(args: Array[String]): Unit = {
 implicit val system = ActorSystem(Sys)
 val data = ByteString(Lorem Ipsum is simply.Dummy text of the 
 printing.And typesetting industry.)
 Flow(data).
   splitWhen(c = c == '.').
   foreach{producer = 
 Flow(producer).
   filter(c = c != '.').
   fold(new StringBuilder)((sb, c) = sb.append(c.toChar)).
   map(_.toString).
   filter(!_.isEmpty).
   foreach(println(_)).
   consume(FlowMaterializer(MaterializerSettings()))
   }.
   onComplete(FlowMaterializer(MaterializerSettings())) {
 case any =
   system.shutdown
   }
   }
 }

 The main function on the `Flow` that I found to accomplish my goal was 
 `splitWhen`, which then produces additional sub-flows, one for each message 
 per that `.` delimiter.  I then process each sub-flow with another pipeline 
 of steps, finally printing the individual messages at the end.

 This all seems a bit verbose, to accomplish what I thought to be a pretty 
 simple and common use case.  So my question is, is there a cleaner and less 
 verbose way to do this or is this the correct and preferred way to split a 
 stream up by a delimiter?

 The link to the SO question is: 
 http://stackoverflow.com/questions/25631099/how-to-split-an-inbound-stream-on-a-delimiter-character-using-akka-streams


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.