[akka-user] Re: streams: unroll -> sequence for flow -> reroll -- howto?
Posting my owner discovery for future readers/searchers, especially since it's so simple :) The short answer: splitWhen(ign => true) The code is probably not idiomatic, being my second short foray into scala. It does compile and run on my setup. Any general thoughts/improvements are appreciated :) package spss import scala.concurrent._ import scala.concurrent.duration._ import akka.actor.{ ActorSystem } import akka.stream._ import akka.stream.scaladsl._ object BatchSplitWhen { val system = ActorSystem.create("batch2") implicit val materializer = ActorMaterializer.create(system) def main(args: Array[String]): Unit = { val batchesStr = s"a word or two\nseparated by newlines\nmakes for a simple\nstreaming batch test." val batches = Source(batchesStr.split("\n").to[collection.immutable.Seq]) *val substreams = batches.splitWhen(s => true)* val toWords = substreams.mapConcat(_.split(" ").to[collection.immutable.Seq]) val toUpper = toWords.map(_.toUpperCase) val toLine = toUpper.reduce(_ + "-" + _) *val toLines = toLine.mergeSubstreams.runFold("")(_ + "\n" + _)* val res = Await.result(toLines, 5 seconds) println(res) system.terminate } } -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: streams: unroll -> sequence for flow -> reroll -- howto?
Thanks for the follow up -- the idea you posted is related, but not quite it. I wrote a self-contained working example below. What I do in this example is manually materialize a graph, and wrap it as a future, making it usable by mapAsync. *Ideal: to define an entire graph such that the materialized subgraphs in lineRewriter are instantiated and managed by akka itself.* Can the code below be converted to do such a thing? (Scala responses also great, but this happens to be java.) b --- // materialize entire subgraph as an async func public static CompletionStage lineRewriter(Materializer mat, String line) { return Source .from(Arrays.asList(line.split(" "))) // unroll .via(Flow.fromFunction(String::toUpperCase))// per-item processing .runReduce((acc, w) -> acc + " " + w, mat); // rollup } public static void main(String[] args) { final String batchesStr = "a word or two\nseparated by newlines\nmakes for a simple\nstreaming batch test."; final Sourcebatches = Source.from(Arrays.asList(batchesStr.split("\n"))); final Sink printResp = Sink.foreach(System.out::println); batches.mapAsync(1, line -> lineRewriter(materializer, line)).toMat(printResp, Keep.right()) .run(materializer) .thenApply(d -> { system.terminate(); return d; }) .exceptionally(t -> { system.terminate(); return Done.getInstance(); }); } On Wednesday, May 10, 2017 at 1:53:43 PM UTC+2, Julian Howarth wrote: > > I may have misunderstood what you're trying to do but I think you can > probably use expand for this. In builder pseudocode, something like: > > Flow ~> Unzip ~> ~> Zip > ~> Flow.expand(Iterator.continually(_))~> > -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] streams: unroll -> sequence for flow -> reroll -- howto?
Hi, I am new to akka streams, and working on a project having both real-time and batch needs. In the batch cases, I need to take context (e.g., requestID), from the initial request, and apply it at the end of batch-item processing. The batch-item subflow is by itself rather simple: (parse, mapConcat, transform). The item transforms comes from the real-time context. It is the folding into an output format that is less obvious. That code needs the request ID and other context from the original request to generate an output byte stream. What I want to do is treat the context and input bytes as a tuple, unzip them in the beginning, zip them at the fold stage. I need to understand how to map each input byte stream into a sequence of stream elements in a sub-flow, and fold each subsequence in its natural grouping. mapConcat flattens, thus dropping the boundaries between adjacent batch requests. I could use map, and materialize a new stream within each batch handler for its items, and send the folded result as the output of the flow... but that seems strange, and likely to be already available. (Especially since GroupBy and SubFlow seem closely related...) Does this exist? Thanks, b -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.