Re: [akka-user] Merge matching elements of multiple streams into Tuples
Hey Patrik! I got it working with a running test :) Still need to properly stress-test it, though. see my repo <https://github.com/FloWi/test-groupstage-akka-streams/> Had a lot of pen-and-paper design upfront, to get the merge-logic right, but the implementation was pretty straightforward. I really like the extensibility of akka-streams. If you have any suggestions, let me know. Feel free to use this as an example for extensibility in your documentation. Florian Am Freitag, 20. November 2015 14:25:03 UTC+1 schrieb Florian Witteler: > > I found out what the problem is. > The UniformFanInShape isn't available in a scala-class in the sourcecode > since it is being generated from a template. > akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template > > I generated the sourcecode and try to work through the logic now. > Thanks for pointing me in the right direction! > > Am Freitag, 20. November 2015 14:16:50 UTC+1 schrieb Patrik Nordwall: >> >> >> >> On Fri, Nov 20, 2015 at 2:09 PM, Florian Witteler < >> florian@googlemail.com> wrote: >> >>> Thanks for getting back to me. >>> No need to be sorry - the docs of the akka-framework are imho the most >>> qualitative docs of all frameworks I know. And quality takes time. >>> >>> After looking through the >>> 'releasing-akka-stream-and-http-experimental-2.0-M1' branch I think I can >>> take akka.stream.scaladsl.Concat as a blueprint for my solution, but it >>> extends >>> GraphStage[UniformFanInShape[T, T]] - a class neither me nor IDEA are >>> able to find. >>> Would you please help me out here? >>> >> >> Are you using version 2.0-M1, it's new in that release ? >> That is also the reason I pointed to that specific branch, so that the >> code would match up with the 2.0-M1 release. >> >> /Patrik >> >> >>> >>> Florian >>> >>> >>> Am Freitag, 20. November 2015 13:40:33 UTC+1 schrieb Patrik Nordwall: >>>> >>>> On Fri, Nov 20, 2015 at 12:03 PM, Florian Witteler < >>>> florian@googlemail.com> wrote: >>>> >>>> I have an elasticsearch-database with all our articles. Each tenant has >>>>> its own index. These indexes are denormalized (the common properties of >>>>> an >>>>> article are duplicated into each index) >>>>> We are about to implement streaming of articles with akka http and use >>>>> the elasticsearch-drivers' reactive streaming api. >>>>> So far so good - set aside the enthusiasm of the awesomeness of >>>>> handling breakpressure from the client through two modules back to the >>>>> database ;-) >>>>> >>>>> For another project we need an export of the articles of *all* >>>>> tenants in another fashion. >>>>> In my module I'm going to query the articles for each tenant (and have >>>>> a List of akka.stream.scaladsl.Source) >>>>> >>>>> The export has to be in a format like this: >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> The Details doesn't matter - just think of Article 1 .. n Localizations >>>>> >>>>> Now consider this constellation of articles in the streams. >>>>> >>>>> de 1 2 3 4 5 6 7 8 EOF >>>>> es 1 2 4 5 6 8 EOF >>>>> cz 1 2 4 6 7 8 EOF >>>>> >>>>> As you can see not every tenant has the same articles. >>>>> I'd like to group several streams together and generate tuples of >>>>> matching articles. What I want is s.th. like that. >>>>> >>>>> 1 -> (de,es,cz) >>>>> 2 -> (de,es,cz) >>>>> 3 -> (de) >>>>> 4 -> (de,es,cz) >>>>> 5 -> (de,es) >>>>> 6 -> (de,es,cz) >>>>> 7 -> (de,cz) >>>>> 8 -> (de,es,cz) >>>>> >>>>> I guess a GraphStage is what I need. Si
Re: [akka-user] Merge matching elements of multiple streams into Tuples
I found out what the problem is. The UniformFanInShape isn't available in a scala-class in the sourcecode since it is being generated from a template. akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template I generated the sourcecode and try to work through the logic now. Thanks for pointing me in the right direction! Am Freitag, 20. November 2015 14:16:50 UTC+1 schrieb Patrik Nordwall: > > > > On Fri, Nov 20, 2015 at 2:09 PM, Florian Witteler < > florian@googlemail.com > wrote: > >> Thanks for getting back to me. >> No need to be sorry - the docs of the akka-framework are imho the most >> qualitative docs of all frameworks I know. And quality takes time. >> >> After looking through the >> 'releasing-akka-stream-and-http-experimental-2.0-M1' branch I think I can >> take akka.stream.scaladsl.Concat as a blueprint for my solution, but it >> extends >> GraphStage[UniformFanInShape[T, T]] - a class neither me nor IDEA are >> able to find. >> Would you please help me out here? >> > > Are you using version 2.0-M1, it's new in that release ? > That is also the reason I pointed to that specific branch, so that the > code would match up with the 2.0-M1 release. > > /Patrik > > >> >> Florian >> >> >> Am Freitag, 20. November 2015 13:40:33 UTC+1 schrieb Patrik Nordwall: >>> >>> On Fri, Nov 20, 2015 at 12:03 PM, Florian Witteler < >>> florian@googlemail.com> wrote: >>> >>> I have an elasticsearch-database with all our articles. Each tenant has >>>> its own index. These indexes are denormalized (the common properties of an >>>> article are duplicated into each index) >>>> We are about to implement streaming of articles with akka http and use >>>> the elasticsearch-drivers' reactive streaming api. >>>> So far so good - set aside the enthusiasm of the awesomeness of >>>> handling breakpressure from the client through two modules back to the >>>> database ;-) >>>> >>>> For another project we need an export of the articles of *all* tenants >>>> in another fashion. >>>> In my module I'm going to query the articles for each tenant (and have >>>> a List of akka.stream.scaladsl.Source) >>>> >>>> The export has to be in a format like this: >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> The Details doesn't matter - just think of Article 1 .. n Localizations >>>> >>>> Now consider this constellation of articles in the streams. >>>> >>>> de 1 2 3 4 5 6 7 8 EOF >>>> es 1 2 4 5 6 8 EOF >>>> cz 1 2 4 6 7 8 EOF >>>> >>>> As you can see not every tenant has the same articles. >>>> I'd like to group several streams together and generate tuples of >>>> matching articles. What I want is s.th. like that. >>>> >>>> 1 -> (de,es,cz) >>>> 2 -> (de,es,cz) >>>> 3 -> (de) >>>> 4 -> (de,es,cz) >>>> 5 -> (de,es) >>>> 6 -> (de,es,cz) >>>> 7 -> (de,cz) >>>> 8 -> (de,es,cz) >>>> >>>> I guess a GraphStage is what I need. Since the articles of each stream >>>> are ordered in the same way, I can group them together if I look at the >>>> head element of each stream. >>>> Here is my pseudocode. >>>> val sources: List[Source] //all inputStreams >>>> val minArticleNumber = sources.map(s => s.head.articleNumber).min >>>> val matchingStreams = sources.filter(_.head.articleNumber == >>>> minArticleNumber) >>>> >>>> >>>> //pick the current element of each of the matchingStreams, create a >>>> tuple and push it out of this stage and advance each of the matching >>>> streams at once. >>>> >>>> >>>> Does anyone have a running example of a GraphStage with customized >>>> logic? >>>> >>> I’m sorry that the documentation is not ready yet. You can find plenty >>> of implementations inside akka
Re: [akka-user] Merge matching elements of multiple streams into Tuples
Thanks for getting back to me. No need to be sorry - the docs of the akka-framework are imho the most qualitative docs of all frameworks I know. And quality takes time. After looking through the 'releasing-akka-stream-and-http-experimental-2.0-M1' branch I think I can take akka.stream.scaladsl.Concat as a blueprint for my solution, but it extends GraphStage[UniformFanInShape[T, T]] - a class neither me nor IDEA are able to find. Would you please help me out here? Florian Am Freitag, 20. November 2015 13:40:33 UTC+1 schrieb Patrik Nordwall: > > On Fri, Nov 20, 2015 at 12:03 PM, Florian Witteler < > florian@googlemail.com > wrote: > > I have an elasticsearch-database with all our articles. Each tenant has >> its own index. These indexes are denormalized (the common properties of an >> article are duplicated into each index) >> We are about to implement streaming of articles with akka http and use >> the elasticsearch-drivers' reactive streaming api. >> So far so good - set aside the enthusiasm of the awesomeness of handling >> breakpressure from the client through two modules back to the database ;-) >> >> For another project we need an export of the articles of *all* tenants >> in another fashion. >> In my module I'm going to query the articles for each tenant (and have a >> List of akka.stream.scaladsl.Source) >> >> The export has to be in a format like this: >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> The Details doesn't matter - just think of Article 1 .. n Localizations >> >> Now consider this constellation of articles in the streams. >> >> de 1 2 3 4 5 6 7 8 EOF >> es 1 2 4 5 6 8 EOF >> cz 1 2 4 6 7 8 EOF >> >> As you can see not every tenant has the same articles. >> I'd like to group several streams together and generate tuples of >> matching articles. What I want is s.th. like that. >> >> 1 -> (de,es,cz) >> 2 -> (de,es,cz) >> 3 -> (de) >> 4 -> (de,es,cz) >> 5 -> (de,es) >> 6 -> (de,es,cz) >> 7 -> (de,cz) >> 8 -> (de,es,cz) >> >> I guess a GraphStage is what I need. Since the articles of each stream >> are ordered in the same way, I can group them together if I look at the >> head element of each stream. >> Here is my pseudocode. >> val sources: List[Source] //all inputStreams >> val minArticleNumber = sources.map(s => s.head.articleNumber).min >> val matchingStreams = sources.filter(_.head.articleNumber == >> minArticleNumber) >> >> >> //pick the current element of each of the matchingStreams, create a tuple >> and push it out of this stage and advance each of the matching streams at >> once. >> >> >> Does anyone have a running example of a GraphStage with customized logic? >> > I’m sorry that the documentation is not ready yet. You can find plenty of > implementations inside akka-streams > <https://github.com/akka/akka/tree/releasing-akka-stream-and-http-experimental-2.0-M1/akka-stream/> > > itself. Search for extends GraphStage > > /Patrik > > Otherwise I'd like to ask for some pointers in the right direction. If I >> get it done, I can provide a running example for the documentation (or a >> github project) >> >> Thanks! >> Florian >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to akka-user+...@googlegroups.com . >> To post to this group, send email to akka...@googlegroups.com >> . >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > -- > > Patrik Nordwall > Typesafe <http://typesafe.com/> - Reactive apps on the JVM > Twitter: @patriknw > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Merge matching elements of multiple streams into Tuples
I have an elasticsearch-database with all our articles. Each tenant has its own index. These indexes are denormalized (the common properties of an article are duplicated into each index) We are about to implement streaming of articles with akka http and use the elasticsearch-drivers' reactive streaming api. So far so good - set aside the enthusiasm of the awesomeness of handling breakpressure from the client through two modules back to the database ;-) For another project we need an export of the articles of *all* tenants in another fashion. In my module I'm going to query the articles for each tenant (and have a List of akka.stream.scaladsl.Source) The export has to be in a format like this: The Details doesn't matter - just think of Article 1 .. n Localizations Now consider this constellation of articles in the streams. de 1 2 3 4 5 6 7 8 EOF es 1 2 4 5 6 8 EOF cz 1 2 4 6 7 8 EOF As you can see not every tenant has the same articles. I'd like to group several streams together and generate tuples of matching articles. What I want is s.th. like that. 1 -> (de,es,cz) 2 -> (de,es,cz) 3 -> (de) 4 -> (de,es,cz) 5 -> (de,es) 6 -> (de,es,cz) 7 -> (de,cz) 8 -> (de,es,cz) I guess a GraphStage is what I need. Since the articles of each stream are ordered in the same way, I can group them together if I look at the head element of each stream. Here is my pseudocode. val sources: List[Source] //all inputStreams val minArticleNumber = sources.map(s => s.head.articleNumber).min val matchingStreams = sources.filter(_.head.articleNumber == minArticleNumber) //pick the current element of each of the matchingStreams, create a tuple and push it out of this stage and advance each of the matching streams at once. Does anyone have a running example of a GraphStage with customized logic? Otherwise I'd like to ask for some pointers in the right direction. If I get it done, I can provide a running example for the documentation (or a github project) Thanks! Florian -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] dependency between tests when using ImplicitSender and forget to consume a response with any of the expect-methods
Makes sense to mix both styles. Thanks for the insight from inside the akka-team ;-) Keep up the good work! Am Sonntag, 23. November 2014 00:51:52 UTC+1 schrieb Konrad Malawski: > > We actually often find this rather useful - for example, if there comes a > response from the previous test case which I did not expect, > it could fail the 2nd test as an unexpected message may come in. This > helps to make sure there’s no spurious and additional messages sent around. > So yes, when using ImplicitSender, the assumption is that you want to make > sure you expect all messages. > > If however you want to have the tests separated completely - don’t use > implicit sender and use test probes instead: > > http://doc.akka.io/docs/akka/snapshot/scala/testing.html#using-multiple-probe-actors > > Both styles make complete sense and come down to your personal testing > style - we use both styles to test Akka itself actually. > > — ktoso > > > > On 21 November 2014 at 22:41:24, Florian Witteler ( > florian@googlemail.com ) wrote: > Hi! > > I was bitten by some special behaviour when testing my actorSystem with > the help of the ImplicitSender trait. > > I have two testmethods. In each of them, I send a message to an actor > wrapped in a TestActorRef. > In the first testcase, I don't consume the answer of the actor with one of > TestKit's expect-Methods, because I check its state directly > (testActorRef.underlyingActor) > This causes the 2nd testcase to fail, because the first message was > still in the receiving mailbox. > > Here is an example. > https://gist.github.com/FloWi/55572324f2d3adb3d51b > > Do you have any suggestion on how to make the tests independent of each > other? > Or am I just using the ImplicitSender in a wrong way? (every answer *must* > be consumed) > > Any insights are appreciated! > > Regards, > Florian > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to akka-user+...@googlegroups.com . > To post to this group, send email to akka...@googlegroups.com > . > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- > Konrad 'ktoso' Malawski > hAkker @ typesafe > http://akka.io > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] dependency between tests when using ImplicitSender and forget to consume a response with any of the expect-methods
Hi! I was bitten by some special behaviour when testing my actorSystem with the help of the ImplicitSender trait. I have two testmethods. In each of them, I send a message to an actor wrapped in a TestActorRef. In the first testcase, I don't consume the answer of the actor with one of TestKit's expect-Methods, because I check its state directly (testActorRef.underlyingActor) This causes the 2nd testcase to fail, because the first message was still in the receiving mailbox. Here is an example. https://gist.github.com/FloWi/55572324f2d3adb3d51b Do you have any suggestion on how to make the tests independent of each other? Or am I just using the ImplicitSender in a wrong way? (every answer *must* be consumed) Any insights are appreciated! Regards, Florian -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.