I have an elasticsearch-database with all our articles. Each tenant has its 
own index. These indexes are denormalized (the common properties of an 
article are duplicated into each index)
We are about to implement streaming of articles with akka http and use the 
elasticsearch-drivers' reactive streaming api.
So far so good - set aside the enthusiasm of the awesomeness of handling 
breakpressure from the client through two modules back to the database ;-)

For another project we need an export of the articles of *all* tenants in 
another fashion.
In my module I'm going to query the articles for each tenant (and have a 
List of akka.stream.scaladsl.Source)

The export has to be in a format like this:
<articles>
    <article code="1">
        <!-- common stuff -->
        <localizations>
            <localization isocode="de">
                <!-- de-stuff -->
            </localization>
            <localization isocode="es">
                <!-- es-stuff -->
            </localization>
            <localization isocode="cz">
                <!-- cz-stuff -->
            </localization>
        </localizations>
    </article>
</articles>
The Details doesn't matter - just think of Article 1 .. n Localizations

Now consider this constellation of articles in the streams.

de 1 2 3 4 5 6 7 8 EOF
es 1 2   4 5 6   8 EOF
cz 1 2   4   6 7 8 EOF

As you can see not every tenant has the same articles.
I'd like to group several streams together and generate tuples of matching 
articles. What I want is s.th. like that.

  1 -> (de,es,cz)
  2 -> (de,es,cz)
  3 -> (de)
  4 -> (de,es,cz)
  5 -> (de,es)
  6 -> (de,es,cz)
  7 -> (de,cz)
  8 -> (de,es,cz)

I guess a GraphStage is what I need. Since the articles of each stream are 
ordered in the same way, I can group them together if I look at the head 
element of each stream.
Here is my pseudocode.
val sources: List[Source] //all inputStreams 
val minArticleNumber = sources.map(s => s.head.articleNumber).min
val matchingStreams = sources.filter(_.head.articleNumber == 
minArticleNumber)


//pick the current element of each of the matchingStreams, create a tuple 
and push it out of this stage and advance each of the matching streams at 
once.


Does anyone have a running example of a GraphStage with customized logic?
Otherwise I'd like to ask for some pointers in the right direction. If I 
get it done, I can provide a running example for the documentation (or a 
github project)

Thanks!
Florian

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to