Progress! I modified the flow as follows, adding

1) withAttributes(Attributes.asyncBoundary) because Roland had a comment
about the sources needing an async boundary.
2) groupByChannel and mergeSubstreams
3) viaAsync before the db-calling transformation

Now I see the output folders and monthly files for each channel getting
created in parallel, and filled in an interval at a time. Pretty cool to
watch.

I need to experiment further and/or read some more about the async
boundaries - I'm not certain whether the withAttributes is in the right
place and whether I picked the right transformation for the viaAsync.

  def channelToBytesWritten(db: JdbcBackend.DatabaseDef, batchSize: Int,
requestedRange:InstantRange): Flow[TenantSiteChannelInfo,(String,Long),Unit]
= {

    val result = Flow[TenantSiteChannelInfo].withAttributes(Attributes.
asyncBoundary)

      .groupBy(100000, c => c.channelId)

      .via(Transformations.channelToChannelMonth(requestedRange))

      .viaAsync(Transformations.channelMonthToChannelMonthInterval(db,
batchSize))

      .via(Transformations.channelMonthIntervalToBytesWritten)

    .mergeSubstreams

    result

  }

I did get the following error, so I'm not out of the woods yet.

2016-02-05 11:04:29,305 ERROR FileSubscriber: - Tearing down FileSink(...)
due to upstream error

org.postgresql.util.PSQLException: ERROR: canceling statement due to user
request

at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(
QueryExecutorImpl.java:2161)

at org.postgresql.core.v3.QueryExecutorImpl.processResults(
QueryExecutorImpl.java:1890)

at org.postgresql.core.v3.QueryExecutorImpl.execute(
QueryExecutorImpl.java:255)

at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(
AbstractJdbc2Statement.java:560)

at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(
AbstractJdbc2Statement.java:417)

at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(
AbstractJdbc2Statement.java:410)

at com.zaxxer.hikari.proxy.PreparedStatementProxy.execute(
PreparedStatementProxy.java:44)

at
com.zaxxer.hikari.proxy.PreparedStatementJavassistProxy.execute(PreparedStatementJavassistProxy.java)

at slick.jdbc.StatementInvoker.results(StatementInvoker.scala:39)

at slick.jdbc.StatementInvoker.iteratorTo(StatementInvoker.scala:22)

at slick.jdbc.StreamingInvokerAction$class.emitStream(
StreamingInvokerAction.scala:28)

at
slick.driver.JdbcActionComponent$QueryActionExtensionMethodsImpl$$anon$1.emitStream(
JdbcActionComponent.scala:218)

at
slick.driver.JdbcActionComponent$QueryActionExtensionMethodsImpl$$anon$1.emitStream(
JdbcActionComponent.scala:218)

at slick.backend.DatabaseComponent$DatabaseDef$$anon$3.run(
DatabaseComponent.scala:285)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

On Fri, Feb 5, 2016 at 8:20 AM, Endre Varga <endre.va...@typesafe.com>
wrote:

> Hi Richard,
>
> I recommend you to draw a simple diagram. Draw groupBy as a box with
> multiple outputs, then draw the subsequent processing steps (Flow) as
> another box. Now collect back the edges into a "flatten" box. Now you can
> draw boundaries around various boxes (actors) and see how things relate.
>
> Then try it out in practice.
>
> -Endre
>
> On Fri, Feb 5, 2016 at 5:13 PM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Thank you. But parallelism *between stages* is not the same as
>> per-group-key parallelism, right?
>> In the original post I included the pipeline to provide context.
>> There are two database queries involved, (Channels, Intervals) both using
>> Slick streaming rather than Future-returning db.run(action). So to build
>> the stream of ChannelAndIntervals requires a flatMapConcat.
>> So I'm basically wondering whether
>> channels.groupBy(_.channelId).flatMap(channel =>
>> Sources.intervalsForChannel(...)) would result in parallel
>> intervals-for-channel queries.
>> I think the answer is no.
>> But since Endre said  groupBy(...).viaAsync(a) introduces an actor per
>> substream, I'm thinking maybe
>> groupBy(...).viaAsync(Transformations.dotheflatmapfromchannelstochannelintervals)
>> *would* result in parallel interval-for-channel queries.
>>
>> Sorry to be so dense.
>>
>>
>>
>> On Fri, Feb 5, 2016 at 7:45 AM, Roland Kuhn <goo...@rkuhn.info> wrote:
>>
>>> The correct statement would be: “groupBy does not automatically
>>> introduce any per-key parallelism.”
>>>
>>> There are several other combinators that can be used to introduce
>>> parallelism between processing stages (like viaAsync, mapAsync and
>>> potentially the flatMapX methods—if the provided sources declare async
>>> boundaries). We try to keep the combinators and concepts as orthogonal and
>>> composable as possible.
>>>
>>> Regards,
>>>
>>> Roland
>>>
>>> 5 feb 2016 kl. 16:24 skrev Richard Rodseth <rrods...@gmail.com>:
>>>
>>> In an effort to be more succinct :) Is this a true statement?
>>>
>>> "groupBy does not automatically introduce any *per-key* parallelism,
>>> unless followed by mapAsync"
>>>
>>> On Thu, Feb 4, 2016 at 3:05 PM, Richard Rodseth <rrods...@gmail.com>
>>> wrote:
>>>
>>>> I guess I'm still a bit confused by parallelism in akka streams, but
>>>> let me describe what I have.
>>>>
>>>> Tenants have Sites which have Channels which have Intervals (start end
>>>> value)
>>>>
>>>> My root source is a stream of TenantSiteChannelInfo (obtained from a
>>>> join of channels with their sites and tenants)
>>>>
>>>> I am successfully writing files, one per channel-month arranged by
>>>> tenant and site, eg.
>>>>
>>>> /archive/<tenantid>/<siteid>/<channelid>/<yearmonth>_<channelid>.txt
>>>>
>>>> using a flow like this
>>>>
>>>>   def channelToBytesWritten(db: JdbcBackend.DatabaseDef, batchSize:
>>>> Int, requestedRange:InstantRange): 
>>>> Flow[TenantSiteChannelInfo,(String,Long),Unit]
>>>> = {
>>>>
>>>>     val result = Flow[TenantSiteChannelInfo]
>>>>
>>>>       .via(Transformations.channelToChannelMonths(requestedRange)) //
>>>> uses flatMapConcat
>>>>
>>>>       .via(Transformations.channelMonthToChannelMonthIntervals(db,
>>>> batchSize)) // uses flatMapConcat
>>>>
>>>>       .via(Transformations.channelMonthIntervalToBytesWritten) // see
>>>> below
>>>>
>>>>     result
>>>>
>>>>   }
>>>>
>>>> Transformations.channelMonthIntervalToBytesWritten does a
>>>> groupBy.prefixAndTail(1).mapAsync as discussed in a recent thread, where
>>>> the key is (year-month,channelid)
>>>>
>>>> The result is I see the files showing up in the Finder in parallel,
>>>> which is OK and fun to watch. The number of distinct keys is rather large
>>>> but I could presumably throttle the source channels if necessary to address
>>>> that.
>>>>
>>>> But suppose I just/also wanted to process each *channel* (the very
>>>> first type of stream element) in parallel? Does inserting a
>>>> groupBy(_channelId).viaAsync at the beginning of this flow and merging
>>>> substreams at the end achieve that, or is running the latter half of the
>>>> flow inside mapAsync the only/best way?
>>>>
>>>> If the groupBy wouldn't achieve per-channel parallelism, is it fair to
>>>> say that groupBy only has utility if followed by some sort of "folding", as
>>>> in the word count example or the file writing above, or if the receiving
>>>> sink (shared by all substreams, an actor perhaps) can key off the same key
>>>> in some way.
>>>>
>>>> Thanks.
>>>>
>>>
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to akka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>>
>>>
>>>
>>> *Dr. Roland Kuhn*
>>> *Akka Tech Lead*
>>> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
>>> twitter: @rolandkuhn
>>> <http://twitter.com/#!/rolandkuhn>
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to akka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to