Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-26 Thread Roland Kuhn
Instead of using a fold sink which materializes to a Future you’ll need to use 
a fold combinator which produces the result as its only value after the 
substream has completed, i.e. you keep the computation results within the 
streaming domain a bit longer instead of going to the Future domain 
immediately. Your 1.0 solution must have done something very similar, only with 
a different API (i.e. you folded the substreams and then probably used mapAsync 
to flatten the resulting stream of Futures).

Regards,

Roland

> 26 apr 2016 kl. 18:05 skrev Debasish Ghosh :
> 
> Roland -
> 
> I need to merge into a fold sink w/ a Monoid. Consider the following example 
> ..
> 
> case class Transaction(id: String, accountNo: String, debitCredit: 
> TransactionType, amount: Amount, date: Date = today)
> 
> and I have the following list of Transactions ..
> 
> val txns =
> Seq(
>   Transaction("t-1", "a-1", Debit, 1000),
>   Transaction("t-2", "a-2", Debit, 1000),
>   Transaction("t-3", "a-3", Credit, 1000),
>   Transaction("t-4", "a-1", Credit, 1000),
>   Transaction("t-5", "a-1", Debit, 1000),
>   Transaction("t-6", "a-2", Debit, 1000),
>   Transaction("t-7", "a-3", Credit, 1000),
>   Transaction("t-8", "a-3", Debit, 1000),
>   Transaction("t-9", "a-2", Credit, 1000),
>   Transaction("t-10", "a-2", Debit, 1000),
>   Transaction("t-11", "a-1", Credit, 1000),
>   Transaction("t-12", "a-3", Debit, 1000)
> )
> 
> I do a group by accountNo, which gives me 3 substreams. Each substream needs 
> to go into a fold sink where I fold using a Monoid. The Monoid has the logic 
> of merging transactions belonging to the same accountNo. The logic that u 
> suggest will not do this. I need to access the substreams separately. That's 
> why I did a groupBy (similar to SQL groupBy). And I could do this in the 1.0 
> version.
> 
> Any workaround that u suggest ?
> 
> Thanks.
> 
> On Tue, Apr 26, 2016 at 9:18 PM, Roland Kuhn  > wrote:
> If you need the results from the substreams you’ll have to merge them back 
> into the mainstream and aggregate them there:
> 
> transactions.groupBy(100, 
> ...).fold(...).mergeSubstreams.grouped(100).to(Sink.head)
> 
> Regards,
> 
> Roland
> 
>> 26 apr 2016 kl. 17:36 skrev debasish > >:
>> 
>> Viktor -
>> 
>> Here's the same stuff that works for akka-streams version 
>> akka-stream-experimental 1.0 RC4. .. 
>> https://gist.github.com/debasishg/4d596c1f26d4759ed65e281bb2e6fd2c 
>>  ..
>> 
>> The upgrade that I am having trouble with is defining netTxn (pls see the 
>> gist). The groupBy works like a charm in the older version. But in the newer 
>> version it returns a Subflow and I was stumped how to get hold of each of 
>> the substreams and fold them using a Monoid on a fold sink. Konrad's 
>> solution was to use the to method of subflow. But somehow it's not giving 
>> the desired result .. help ?
>> 
>> Thanks.
>> 
>> 
>> On Tuesday, April 26, 2016 at 8:46:11 PM UTC+5:30, √ wrote:
>> What are you expecting to be returned?
>> 
>> -- 
>> Cheers,
>> √
>> 
>> On Apr 26, 2016 3:49 PM, "Debasish Ghosh" > wrote:
>> Thanks Konrad for the pointer .. when I run the graph I get a NotUsed .. 
>> That's not hwat I get with the earlier implementation. Please have a look at 
>> the gist .. 
>> https://gist.github.com/debasishg/a42e867bb2bc8ad18243597178bbce93 
>>  .. what 
>> am I doing wrong.
>> 
>> Thanks.
>> 
>> On Tue, Apr 26, 2016 at 6:22 PM, Konrad Malawski > <>> wrote:
>> (I did a quick mock Transaction type, your Monoid should work fine there 
>> ofc).
>> 
>> -- Konrad
>> 
>> On Sunday, 24 April 2016 21:42:43 UTC+2, debasish wrote:
>> Hi -
>> 
>> I am trying to migrate some akka-streams code from an earlier version 
>> (akka-stream-experimental 1.0.RC4) .. please have a look at the following ..
>> 
>> /**
>>  * Create multiple streams out of a single stream. The stream "transactions" 
>> is being
>>  * demultiplexed into many streams split by account number. Each of the 
>> sub-streams are
>>  * then materialized to the fold sink "txnSink", which folds each of the 
>> transaction
>>  * substreams to compute the net value of the transaction for that account
>>  */
>>  val netTxn: Source[RunnableGraph[Future[Transaction]], Unit] = 
>>transactions.map(validate).groupBy(_.accountNo).map { case (a, s) => 
>> s.toMat(txnSink)(Keep.right) }
>> 
>> I am trying to create substreams based on a field accountNo within a 
>> Transaction. Then I pass each substream to toMat with a Sink which is a fold 
>> Sink defined as below ..
>> 
>> /**
>>  * Would like to fold transactions through monoid append
>>  */
>>  val txnSink: Sink[Transaction, Future[Transaction]] =
>>Sink.fold[Transaction, Transaction](TransactionMonoid.zero)(_ |+| _)
>> 
>> The idea is to apply a mono

Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-26 Thread Debasish Ghosh
Roland -

The problem is I cannot access the substreams separately within the fold.
In the earlier solution the fold was on individual substreams (not on the
whole stream). Here the fold gets *all* the elements from all substreams
and I lose the ability to process substreams separately. Hence I lose the
ability to compose with a Monoid. Possibly I need to create a separate Map
and then do the aggregation. It will be clunky .. I am already starting to
feel the loss of the 1.0 API, which I thought was very idiomatic from
groupBy point of view.

Let me see if I can at all get a solution for this ..

regards.

On Tue, Apr 26, 2016 at 10:33 PM, Roland Kuhn  wrote:

> Instead of using a fold sink which materializes to a Future you’ll need to
> use a fold combinator which produces the result as its only value after the
> substream has completed, i.e. you keep the computation results within the
> streaming domain a bit longer instead of going to the Future domain
> immediately. Your 1.0 solution must have done something very similar, only
> with a different API (i.e. you folded the substreams and then probably used
> mapAsync to flatten the resulting stream of Futures).
>
> Regards,
>
> Roland
>
> 26 apr 2016 kl. 18:05 skrev Debasish Ghosh :
>
> Roland -
>
> I need to merge into a fold sink w/ a Monoid. Consider the following
> example ..
>
> case class Transaction(id: String, accountNo: String, debitCredit:
> TransactionType, amount: Amount, date: Date = today)
>
> and I have the following list of Transactions ..
>
> val txns =
> Seq(
>   Transaction("t-1", "a-1", Debit, 1000),
>   Transaction("t-2", "a-2", Debit, 1000),
>   Transaction("t-3", "a-3", Credit, 1000),
>   Transaction("t-4", "a-1", Credit, 1000),
>   Transaction("t-5", "a-1", Debit, 1000),
>   Transaction("t-6", "a-2", Debit, 1000),
>   Transaction("t-7", "a-3", Credit, 1000),
>   Transaction("t-8", "a-3", Debit, 1000),
>   Transaction("t-9", "a-2", Credit, 1000),
>   Transaction("t-10", "a-2", Debit, 1000),
>   Transaction("t-11", "a-1", Credit, 1000),
>   Transaction("t-12", "a-3", Debit, 1000)
> )
>
> I do a group by accountNo, which gives me 3 substreams. Each substream
> needs to go into a fold sink where I fold using a Monoid. The Monoid has
> the logic of merging transactions belonging to the same accountNo. The
> logic that u suggest will not do this. I need to access the substreams
> separately. That's why I did a groupBy (similar to SQL groupBy). And I
> could do this in the 1.0 version.
>
> Any workaround that u suggest ?
>
> Thanks.
>
> On Tue, Apr 26, 2016 at 9:18 PM, Roland Kuhn  wrote:
>
>> If you need the results from the substreams you’ll have to merge them
>> back into the mainstream and aggregate them there:
>>
>> transactions.groupBy(100,
>> ...).fold(...).mergeSubstreams.grouped(100).to(Sink.head)
>>
>> Regards,
>>
>> Roland
>>
>> 26 apr 2016 kl. 17:36 skrev debasish :
>>
>> Viktor -
>>
>> Here's the same stuff that works for akka-streams version
>> akka-stream-experimental 1.0 RC4. ..
>> https://gist.github.com/debasishg/4d596c1f26d4759ed65e281bb2e6fd2c ..
>>
>> The upgrade that I am having trouble with is defining netTxn (pls see
>> the gist). The groupBy works like a charm in the older version. But in the
>> newer version it returns a Subflow and I was stumped how to get hold of
>> each of the substreams and fold them using a Monoid on a fold sink.
>> Konrad's solution was to use the to method of subflow. But somehow it's
>> not giving the desired result .. help ?
>>
>> Thanks.
>>
>>
>> On Tuesday, April 26, 2016 at 8:46:11 PM UTC+5:30, √ wrote:
>>>
>>> What are you expecting to be returned?
>>>
>>> --
>>> Cheers,
>>> √
>>> On Apr 26, 2016 3:49 PM, "Debasish Ghosh"  wrote:
>>>
 Thanks Konrad for the pointer .. when I run the graph I get a NotUsed ..
 That's not hwat I get with the earlier implementation. Please have a look
 at the gist ..
 https://gist.github.com/debasishg/a42e867bb2bc8ad18243597178bbce93 ..
 what am I doing wrong.

 Thanks.

 On Tue, Apr 26, 2016 at 6:22 PM, Konrad Malawski <
 konrad@typesafe.com> wrote:

> (I did a quick mock Transaction type, your Monoid should work fine
> there ofc).
>
> -- Konrad
>
> On Sunday, 24 April 2016 21:42:43 UTC+2, debasish wrote:
>
>> Hi -
>>
>> I am trying to migrate some akka-streams code from an earlier version
>> (akka-stream-experimental 1.0.RC4) .. please have a look at the 
>> following ..
>>
>> /**
>>  * Create multiple streams out of a single stream. The stream
>> "transactions" is being
>>  * demultiplexed into many streams split by account number. Each of
>> the sub-streams are
>>  * then materialized to the fold sink "txnSink", which folds each of
>> the transaction
>>  * substreams to compute the net value of the transaction for that
>> account
>>  */
>>  val netTxn: So

Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-26 Thread Viktor Klang
Debasish,

The problem with groupBy was that it was too easy to create leaks and
silently broken solutions with it, but I sympathize with your situation, I
also felt the original as being more ergonomic. I think there is room for
improvement in the current solution.

-- 
Cheers,
√
On Apr 26, 2016 7:20 PM, "Debasish Ghosh"  wrote:

Roland -

The problem is I cannot access the substreams separately within the fold.
In the earlier solution the fold was on individual substreams (not on the
whole stream). Here the fold gets *all* the elements from all substreams
and I lose the ability to process substreams separately. Hence I lose the
ability to compose with a Monoid. Possibly I need to create a separate Map
and then do the aggregation. It will be clunky .. I am already starting to
feel the loss of the 1.0 API, which I thought was very idiomatic from
groupBy point of view.

Let me see if I can at all get a solution for this ..

regards.

On Tue, Apr 26, 2016 at 10:33 PM, Roland Kuhn  wrote:

> Instead of using a fold sink which materializes to a Future you’ll need to
> use a fold combinator which produces the result as its only value after the
> substream has completed, i.e. you keep the computation results within the
> streaming domain a bit longer instead of going to the Future domain
> immediately. Your 1.0 solution must have done something very similar, only
> with a different API (i.e. you folded the substreams and then probably used
> mapAsync to flatten the resulting stream of Futures).
>
> Regards,
>
> Roland
>
> 26 apr 2016 kl. 18:05 skrev Debasish Ghosh :
>
> Roland -
>
> I need to merge into a fold sink w/ a Monoid. Consider the following
> example ..
>
> case class Transaction(id: String, accountNo: String, debitCredit:
> TransactionType, amount: Amount, date: Date = today)
>
> and I have the following list of Transactions ..
>
> val txns =
> Seq(
>   Transaction("t-1", "a-1", Debit, 1000),
>   Transaction("t-2", "a-2", Debit, 1000),
>   Transaction("t-3", "a-3", Credit, 1000),
>   Transaction("t-4", "a-1", Credit, 1000),
>   Transaction("t-5", "a-1", Debit, 1000),
>   Transaction("t-6", "a-2", Debit, 1000),
>   Transaction("t-7", "a-3", Credit, 1000),
>   Transaction("t-8", "a-3", Debit, 1000),
>   Transaction("t-9", "a-2", Credit, 1000),
>   Transaction("t-10", "a-2", Debit, 1000),
>   Transaction("t-11", "a-1", Credit, 1000),
>   Transaction("t-12", "a-3", Debit, 1000)
> )
>
> I do a group by accountNo, which gives me 3 substreams. Each substream
> needs to go into a fold sink where I fold using a Monoid. The Monoid has
> the logic of merging transactions belonging to the same accountNo. The
> logic that u suggest will not do this. I need to access the substreams
> separately. That's why I did a groupBy (similar to SQL groupBy). And I
> could do this in the 1.0 version.
>
> Any workaround that u suggest ?
>
> Thanks.
>
> On Tue, Apr 26, 2016 at 9:18 PM, Roland Kuhn  wrote:
>
>> If you need the results from the substreams you’ll have to merge them
>> back into the mainstream and aggregate them there:
>>
>> transactions.groupBy(100,
>> ...).fold(...).mergeSubstreams.grouped(100).to(Sink.head)
>>
>> Regards,
>>
>> Roland
>>
>> 26 apr 2016 kl. 17:36 skrev debasish :
>>
>> Viktor -
>>
>> Here's the same stuff that works for akka-streams version
>> akka-stream-experimental 1.0 RC4. ..
>> https://gist.github.com/debasishg/4d596c1f26d4759ed65e281bb2e6fd2c ..
>>
>> The upgrade that I am having trouble with is defining netTxn (pls see
>> the gist). The groupBy works like a charm in the older version. But in the
>> newer version it returns a Subflow and I was stumped how to get hold of
>> each of the substreams and fold them using a Monoid on a fold sink.
>> Konrad's solution was to use the to method of subflow. But somehow it's
>> not giving the desired result .. help ?
>>
>> Thanks.
>>
>>
>> On Tuesday, April 26, 2016 at 8:46:11 PM UTC+5:30, √ wrote:
>>>
>>> What are you expecting to be returned?
>>>
>>> --
>>> Cheers,
>>> √
>>> On Apr 26, 2016 3:49 PM, "Debasish Ghosh"  wrote:
>>>
 Thanks Konrad for the pointer .. when I run the graph I get a NotUsed ..
 That's not hwat I get with the earlier implementation. Please have a look
 at the gist ..
 https://gist.github.com/debasishg/a42e867bb2bc8ad18243597178bbce93 ..
 what am I doing wrong.

 Thanks.

 On Tue, Apr 26, 2016 at 6:22 PM, Konrad Malawski <
 konrad@typesafe.com> wrote:

> (I did a quick mock Transaction type, your Monoid should work fine
> there ofc).
>
> -- Konrad
>
> On Sunday, 24 April 2016 21:42:43 UTC+2, debasish wrote:
>
>> Hi -
>>
>> I am trying to migrate some akka-streams code from an earlier version
>> (akka-stream-experimental 1.0.RC4) .. please have a look at the 
>> following ..
>>
>> /**
>>  * Create multiple streams out of a single stream. The stream
>> "transactions"

Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-26 Thread Debasish Ghosh
Thanks Viktor .. my main issue is that we lose the natural semantics of
groupBy as we learnt from SQL. We cannot access the substreams as separate
abstractions that groupBy creates.

On Tue, Apr 26, 2016 at 11:28 PM, Viktor Klang 
wrote:

> Debasish,
>
> The problem with groupBy was that it was too easy to create leaks and
> silently broken solutions with it, but I sympathize with your situation, I
> also felt the original as being more ergonomic. I think there is room for
> improvement in the current solution.
>
> --
> Cheers,
> √
> On Apr 26, 2016 7:20 PM, "Debasish Ghosh" 
> wrote:
>
> Roland -
>
> The problem is I cannot access the substreams separately within the fold.
> In the earlier solution the fold was on individual substreams (not on the
> whole stream). Here the fold gets *all* the elements from all substreams
> and I lose the ability to process substreams separately. Hence I lose the
> ability to compose with a Monoid. Possibly I need to create a separate Map
> and then do the aggregation. It will be clunky .. I am already starting to
> feel the loss of the 1.0 API, which I thought was very idiomatic from
> groupBy point of view.
>
> Let me see if I can at all get a solution for this ..
>
> regards.
>
> On Tue, Apr 26, 2016 at 10:33 PM, Roland Kuhn  wrote:
>
>> Instead of using a fold sink which materializes to a Future you’ll need
>> to use a fold combinator which produces the result as its only value after
>> the substream has completed, i.e. you keep the computation results within
>> the streaming domain a bit longer instead of going to the Future domain
>> immediately. Your 1.0 solution must have done something very similar, only
>> with a different API (i.e. you folded the substreams and then probably used
>> mapAsync to flatten the resulting stream of Futures).
>>
>> Regards,
>>
>> Roland
>>
>> 26 apr 2016 kl. 18:05 skrev Debasish Ghosh :
>>
>> Roland -
>>
>> I need to merge into a fold sink w/ a Monoid. Consider the following
>> example ..
>>
>> case class Transaction(id: String, accountNo: String, debitCredit:
>> TransactionType, amount: Amount, date: Date = today)
>>
>> and I have the following list of Transactions ..
>>
>> val txns =
>> Seq(
>>   Transaction("t-1", "a-1", Debit, 1000),
>>   Transaction("t-2", "a-2", Debit, 1000),
>>   Transaction("t-3", "a-3", Credit, 1000),
>>   Transaction("t-4", "a-1", Credit, 1000),
>>   Transaction("t-5", "a-1", Debit, 1000),
>>   Transaction("t-6", "a-2", Debit, 1000),
>>   Transaction("t-7", "a-3", Credit, 1000),
>>   Transaction("t-8", "a-3", Debit, 1000),
>>   Transaction("t-9", "a-2", Credit, 1000),
>>   Transaction("t-10", "a-2", Debit, 1000),
>>   Transaction("t-11", "a-1", Credit, 1000),
>>   Transaction("t-12", "a-3", Debit, 1000)
>> )
>>
>> I do a group by accountNo, which gives me 3 substreams. Each substream
>> needs to go into a fold sink where I fold using a Monoid. The Monoid has
>> the logic of merging transactions belonging to the same accountNo. The
>> logic that u suggest will not do this. I need to access the substreams
>> separately. That's why I did a groupBy (similar to SQL groupBy). And I
>> could do this in the 1.0 version.
>>
>> Any workaround that u suggest ?
>>
>> Thanks.
>>
>> On Tue, Apr 26, 2016 at 9:18 PM, Roland Kuhn  wrote:
>>
>>> If you need the results from the substreams you’ll have to merge them
>>> back into the mainstream and aggregate them there:
>>>
>>> transactions.groupBy(100,
>>> ...).fold(...).mergeSubstreams.grouped(100).to(Sink.head)
>>>
>>> Regards,
>>>
>>> Roland
>>>
>>> 26 apr 2016 kl. 17:36 skrev debasish :
>>>
>>> Viktor -
>>>
>>> Here's the same stuff that works for akka-streams version
>>> akka-stream-experimental 1.0 RC4. ..
>>> https://gist.github.com/debasishg/4d596c1f26d4759ed65e281bb2e6fd2c ..
>>>
>>> The upgrade that I am having trouble with is defining netTxn (pls see
>>> the gist). The groupBy works like a charm in the older version. But in the
>>> newer version it returns a Subflow and I was stumped how to get hold of
>>> each of the substreams and fold them using a Monoid on a fold sink.
>>> Konrad's solution was to use the to method of subflow. But somehow it's
>>> not giving the desired result .. help ?
>>>
>>> Thanks.
>>>
>>>
>>> On Tuesday, April 26, 2016 at 8:46:11 PM UTC+5:30, √ wrote:

 What are you expecting to be returned?

 --
 Cheers,
 √
 On Apr 26, 2016 3:49 PM, "Debasish Ghosh"  wrote:

> Thanks Konrad for the pointer .. when I run the graph I get a NotUsed ..
> That's not hwat I get with the earlier implementation. Please have a look
> at the gist ..
> https://gist.github.com/debasishg/a42e867bb2bc8ad18243597178bbce93 ..
> what am I doing wrong.
>
> Thanks.
>
> On Tue, Apr 26, 2016 at 6:22 PM, Konrad Malawski <
> konrad@typesafe.com> wrote:
>
>> (I did a quick mock Transaction type, your Monoid should work fine
>> there ofc).

Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-26 Thread Viktor Klang
If you can devise a safe version of groupBy that lets us preserve that nice
quality then I am more than all ears.

-- 
Cheers,
√
On Apr 26, 2016 8:02 PM, "Debasish Ghosh"  wrote:

> Thanks Viktor .. my main issue is that we lose the natural semantics of
> groupBy as we learnt from SQL. We cannot access the substreams as separate
> abstractions that groupBy creates.
>
> On Tue, Apr 26, 2016 at 11:28 PM, Viktor Klang 
> wrote:
>
>> Debasish,
>>
>> The problem with groupBy was that it was too easy to create leaks and
>> silently broken solutions with it, but I sympathize with your situation, I
>> also felt the original as being more ergonomic. I think there is room for
>> improvement in the current solution.
>>
>> --
>> Cheers,
>> √
>> On Apr 26, 2016 7:20 PM, "Debasish Ghosh" 
>> wrote:
>>
>> Roland -
>>
>> The problem is I cannot access the substreams separately within the fold.
>> In the earlier solution the fold was on individual substreams (not on the
>> whole stream). Here the fold gets *all* the elements from all substreams
>> and I lose the ability to process substreams separately. Hence I lose the
>> ability to compose with a Monoid. Possibly I need to create a separate Map
>> and then do the aggregation. It will be clunky .. I am already starting to
>> feel the loss of the 1.0 API, which I thought was very idiomatic from
>> groupBy point of view.
>>
>> Let me see if I can at all get a solution for this ..
>>
>> regards.
>>
>> On Tue, Apr 26, 2016 at 10:33 PM, Roland Kuhn  wrote:
>>
>>> Instead of using a fold sink which materializes to a Future you’ll need
>>> to use a fold combinator which produces the result as its only value after
>>> the substream has completed, i.e. you keep the computation results within
>>> the streaming domain a bit longer instead of going to the Future domain
>>> immediately. Your 1.0 solution must have done something very similar, only
>>> with a different API (i.e. you folded the substreams and then probably used
>>> mapAsync to flatten the resulting stream of Futures).
>>>
>>> Regards,
>>>
>>> Roland
>>>
>>> 26 apr 2016 kl. 18:05 skrev Debasish Ghosh :
>>>
>>> Roland -
>>>
>>> I need to merge into a fold sink w/ a Monoid. Consider the following
>>> example ..
>>>
>>> case class Transaction(id: String, accountNo: String, debitCredit:
>>> TransactionType, amount: Amount, date: Date = today)
>>>
>>> and I have the following list of Transactions ..
>>>
>>> val txns =
>>> Seq(
>>>   Transaction("t-1", "a-1", Debit, 1000),
>>>   Transaction("t-2", "a-2", Debit, 1000),
>>>   Transaction("t-3", "a-3", Credit, 1000),
>>>   Transaction("t-4", "a-1", Credit, 1000),
>>>   Transaction("t-5", "a-1", Debit, 1000),
>>>   Transaction("t-6", "a-2", Debit, 1000),
>>>   Transaction("t-7", "a-3", Credit, 1000),
>>>   Transaction("t-8", "a-3", Debit, 1000),
>>>   Transaction("t-9", "a-2", Credit, 1000),
>>>   Transaction("t-10", "a-2", Debit, 1000),
>>>   Transaction("t-11", "a-1", Credit, 1000),
>>>   Transaction("t-12", "a-3", Debit, 1000)
>>> )
>>>
>>> I do a group by accountNo, which gives me 3 substreams. Each substream
>>> needs to go into a fold sink where I fold using a Monoid. The Monoid has
>>> the logic of merging transactions belonging to the same accountNo. The
>>> logic that u suggest will not do this. I need to access the substreams
>>> separately. That's why I did a groupBy (similar to SQL groupBy). And I
>>> could do this in the 1.0 version.
>>>
>>> Any workaround that u suggest ?
>>>
>>> Thanks.
>>>
>>> On Tue, Apr 26, 2016 at 9:18 PM, Roland Kuhn  wrote:
>>>
 If you need the results from the substreams you’ll have to merge them
 back into the mainstream and aggregate them there:

 transactions.groupBy(100,
 ...).fold(...).mergeSubstreams.grouped(100).to(Sink.head)

 Regards,

 Roland

 26 apr 2016 kl. 17:36 skrev debasish :

 Viktor -

 Here's the same stuff that works for akka-streams version
 akka-stream-experimental 1.0 RC4. ..
 https://gist.github.com/debasishg/4d596c1f26d4759ed65e281bb2e6fd2c ..

 The upgrade that I am having trouble with is defining netTxn (pls see
 the gist). The groupBy works like a charm in the older version. But in the
 newer version it returns a Subflow and I was stumped how to get hold
 of each of the substreams and fold them using a Monoid on a fold sink.
 Konrad's solution was to use the to method of subflow. But somehow
 it's not giving the desired result .. help ?

 Thanks.


 On Tuesday, April 26, 2016 at 8:46:11 PM UTC+5:30, √ wrote:
>
> What are you expecting to be returned?
>
> --
> Cheers,
> √
> On Apr 26, 2016 3:49 PM, "Debasish Ghosh" 
> wrote:
>
>> Thanks Konrad for the pointer .. when I run the graph I get a NotUsed
>>  .. That's not hwat I get with the earlier implementation. Please
>> have a look at the gist ..
>>>

Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-26 Thread Debasish Ghosh
Just for the sake of completeness, this works ..

transactions.map(validate)
.groupBy(MaxGroupCount, _.accountNo)
.fold(Map.empty[String, Transaction])((l, r) => l |+|
Map(r.accountNo -> r))
.mergeSubstreams

Since I cannot access the sub-streams I need to simulate this through a Map
..

Thanks.

On Tue, Apr 26, 2016 at 11:32 PM, Debasish Ghosh 
wrote:

> Thanks Viktor .. my main issue is that we lose the natural semantics of
> groupBy as we learnt from SQL. We cannot access the substreams as separate
> abstractions that groupBy creates.
>
> On Tue, Apr 26, 2016 at 11:28 PM, Viktor Klang 
> wrote:
>
>> Debasish,
>>
>> The problem with groupBy was that it was too easy to create leaks and
>> silently broken solutions with it, but I sympathize with your situation, I
>> also felt the original as being more ergonomic. I think there is room for
>> improvement in the current solution.
>>
>> --
>> Cheers,
>> √
>> On Apr 26, 2016 7:20 PM, "Debasish Ghosh" 
>> wrote:
>>
>> Roland -
>>
>> The problem is I cannot access the substreams separately within the fold.
>> In the earlier solution the fold was on individual substreams (not on the
>> whole stream). Here the fold gets *all* the elements from all substreams
>> and I lose the ability to process substreams separately. Hence I lose the
>> ability to compose with a Monoid. Possibly I need to create a separate Map
>> and then do the aggregation. It will be clunky .. I am already starting to
>> feel the loss of the 1.0 API, which I thought was very idiomatic from
>> groupBy point of view.
>>
>> Let me see if I can at all get a solution for this ..
>>
>> regards.
>>
>> On Tue, Apr 26, 2016 at 10:33 PM, Roland Kuhn  wrote:
>>
>>> Instead of using a fold sink which materializes to a Future you’ll need
>>> to use a fold combinator which produces the result as its only value after
>>> the substream has completed, i.e. you keep the computation results within
>>> the streaming domain a bit longer instead of going to the Future domain
>>> immediately. Your 1.0 solution must have done something very similar, only
>>> with a different API (i.e. you folded the substreams and then probably used
>>> mapAsync to flatten the resulting stream of Futures).
>>>
>>> Regards,
>>>
>>> Roland
>>>
>>> 26 apr 2016 kl. 18:05 skrev Debasish Ghosh :
>>>
>>> Roland -
>>>
>>> I need to merge into a fold sink w/ a Monoid. Consider the following
>>> example ..
>>>
>>> case class Transaction(id: String, accountNo: String, debitCredit:
>>> TransactionType, amount: Amount, date: Date = today)
>>>
>>> and I have the following list of Transactions ..
>>>
>>> val txns =
>>> Seq(
>>>   Transaction("t-1", "a-1", Debit, 1000),
>>>   Transaction("t-2", "a-2", Debit, 1000),
>>>   Transaction("t-3", "a-3", Credit, 1000),
>>>   Transaction("t-4", "a-1", Credit, 1000),
>>>   Transaction("t-5", "a-1", Debit, 1000),
>>>   Transaction("t-6", "a-2", Debit, 1000),
>>>   Transaction("t-7", "a-3", Credit, 1000),
>>>   Transaction("t-8", "a-3", Debit, 1000),
>>>   Transaction("t-9", "a-2", Credit, 1000),
>>>   Transaction("t-10", "a-2", Debit, 1000),
>>>   Transaction("t-11", "a-1", Credit, 1000),
>>>   Transaction("t-12", "a-3", Debit, 1000)
>>> )
>>>
>>> I do a group by accountNo, which gives me 3 substreams. Each substream
>>> needs to go into a fold sink where I fold using a Monoid. The Monoid has
>>> the logic of merging transactions belonging to the same accountNo. The
>>> logic that u suggest will not do this. I need to access the substreams
>>> separately. That's why I did a groupBy (similar to SQL groupBy). And I
>>> could do this in the 1.0 version.
>>>
>>> Any workaround that u suggest ?
>>>
>>> Thanks.
>>>
>>> On Tue, Apr 26, 2016 at 9:18 PM, Roland Kuhn  wrote:
>>>
 If you need the results from the substreams you’ll have to merge them
 back into the mainstream and aggregate them there:

 transactions.groupBy(100,
 ...).fold(...).mergeSubstreams.grouped(100).to(Sink.head)

 Regards,

 Roland

 26 apr 2016 kl. 17:36 skrev debasish :

 Viktor -

 Here's the same stuff that works for akka-streams version
 akka-stream-experimental 1.0 RC4. ..
 https://gist.github.com/debasishg/4d596c1f26d4759ed65e281bb2e6fd2c ..

 The upgrade that I am having trouble with is defining netTxn (pls see
 the gist). The groupBy works like a charm in the older version. But in the
 newer version it returns a Subflow and I was stumped how to get hold
 of each of the substreams and fold them using a Monoid on a fold sink.
 Konrad's solution was to use the to method of subflow. But somehow
 it's not giving the desired result .. help ?

 Thanks.


 On Tuesday, April 26, 2016 at 8:46:11 PM UTC+5:30, √ wrote:
>
> What are you expecting to be returned?
>
> --
> Cheers,
> √
> On Apr 26, 2016 3:49 PM, "Debasish Gh

Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-26 Thread Debasish Ghosh
Actually I realized just now that I don't need the groupBy and mergeStreams
for this .. Just the following will also do ..

transactions.map(validate)
.fold(Map.empty[String, Transaction])((l, r) => l |+|
Map(r.accountNo -> r))

And the example in the Streams Cookbook for counting words or reduce by key
can also be done as the following without using groupBy ..

val counts = words
  .fold(Map.empty[String, Int])((l, r) => l |+| Map(r -> 1))
  .toMat(sink)(Keep.right)

Am I missing something ? Or is there any advantage of using groupBy with
mergeStreams over the above method ?

Thanks.


On Tue, Apr 26, 2016 at 11:44 PM, Debasish Ghosh 
wrote:

> Just for the sake of completeness, this works ..
>
> transactions.map(validate)
> .groupBy(MaxGroupCount, _.accountNo)
> .fold(Map.empty[String, Transaction])((l, r) => l |+|
> Map(r.accountNo -> r))
> .mergeSubstreams
>
> Since I cannot access the sub-streams I need to simulate this through a
> Map ..
>
> Thanks.
>
> On Tue, Apr 26, 2016 at 11:32 PM, Debasish Ghosh  > wrote:
>
>> Thanks Viktor .. my main issue is that we lose the natural semantics of
>> groupBy as we learnt from SQL. We cannot access the substreams as separate
>> abstractions that groupBy creates.
>>
>> On Tue, Apr 26, 2016 at 11:28 PM, Viktor Klang 
>> wrote:
>>
>>> Debasish,
>>>
>>> The problem with groupBy was that it was too easy to create leaks and
>>> silently broken solutions with it, but I sympathize with your situation, I
>>> also felt the original as being more ergonomic. I think there is room for
>>> improvement in the current solution.
>>>
>>> --
>>> Cheers,
>>> √
>>> On Apr 26, 2016 7:20 PM, "Debasish Ghosh" 
>>> wrote:
>>>
>>> Roland -
>>>
>>> The problem is I cannot access the substreams separately within the
>>> fold. In the earlier solution the fold was on individual substreams (not on
>>> the whole stream). Here the fold gets *all* the elements from all
>>> substreams and I lose the ability to process substreams separately. Hence I
>>> lose the ability to compose with a Monoid. Possibly I need to create a
>>> separate Map and then do the aggregation. It will be clunky .. I am already
>>> starting to feel the loss of the 1.0 API, which I thought was very
>>> idiomatic from groupBy point of view.
>>>
>>> Let me see if I can at all get a solution for this ..
>>>
>>> regards.
>>>
>>> On Tue, Apr 26, 2016 at 10:33 PM, Roland Kuhn  wrote:
>>>
 Instead of using a fold sink which materializes to a Future you’ll need
 to use a fold combinator which produces the result as its only value after
 the substream has completed, i.e. you keep the computation results within
 the streaming domain a bit longer instead of going to the Future domain
 immediately. Your 1.0 solution must have done something very similar, only
 with a different API (i.e. you folded the substreams and then probably used
 mapAsync to flatten the resulting stream of Futures).

 Regards,

 Roland

 26 apr 2016 kl. 18:05 skrev Debasish Ghosh :

 Roland -

 I need to merge into a fold sink w/ a Monoid. Consider the following
 example ..

 case class Transaction(id: String, accountNo: String, debitCredit:
 TransactionType, amount: Amount, date: Date = today)

 and I have the following list of Transactions ..

 val txns =
 Seq(
   Transaction("t-1", "a-1", Debit, 1000),
   Transaction("t-2", "a-2", Debit, 1000),
   Transaction("t-3", "a-3", Credit, 1000),
   Transaction("t-4", "a-1", Credit, 1000),
   Transaction("t-5", "a-1", Debit, 1000),
   Transaction("t-6", "a-2", Debit, 1000),
   Transaction("t-7", "a-3", Credit, 1000),
   Transaction("t-8", "a-3", Debit, 1000),
   Transaction("t-9", "a-2", Credit, 1000),
   Transaction("t-10", "a-2", Debit, 1000),
   Transaction("t-11", "a-1", Credit, 1000),
   Transaction("t-12", "a-3", Debit, 1000)
 )

 I do a group by accountNo, which gives me 3 substreams. Each substream
 needs to go into a fold sink where I fold using a Monoid. The Monoid has
 the logic of merging transactions belonging to the same accountNo. The
 logic that u suggest will not do this. I need to access the substreams
 separately. That's why I did a groupBy (similar to SQL groupBy). And I
 could do this in the 1.0 version.

 Any workaround that u suggest ?

 Thanks.

 On Tue, Apr 26, 2016 at 9:18 PM, Roland Kuhn  wrote:

> If you need the results from the substreams you’ll have to merge them
> back into the mainstream and aggregate them there:
>
> transactions.groupBy(100,
> ...).fold(...).mergeSubstreams.grouped(100).to(Sink.head)
>
> Regards,
>
> Roland
>
> 26 apr 2016 kl. 17:36 skrev debasish :
>
> Viktor -
>
> Here's the same stuff that works fo

Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-26 Thread Viktor Klang
The fold will be sequential

On Tue, Apr 26, 2016 at 9:20 PM, Debasish Ghosh 
wrote:

> Actually I realized just now that I don't need the groupBy and
> mergeStreams for this .. Just the following will also do ..
>
> transactions.map(validate)
> .fold(Map.empty[String, Transaction])((l, r) => l |+|
> Map(r.accountNo -> r))
>
> And the example in the Streams Cookbook for counting words or reduce by
> key can also be done as the following without using groupBy ..
>
> val counts = words
>   .fold(Map.empty[String, Int])((l, r) => l |+| Map(r -> 1))
>   .toMat(sink)(Keep.right)
>
> Am I missing something ? Or is there any advantage of using groupBy with
> mergeStreams over the above method ?
>
> Thanks.
>
>
> On Tue, Apr 26, 2016 at 11:44 PM, Debasish Ghosh  > wrote:
>
>> Just for the sake of completeness, this works ..
>>
>> transactions.map(validate)
>> .groupBy(MaxGroupCount, _.accountNo)
>> .fold(Map.empty[String, Transaction])((l, r) => l |+|
>> Map(r.accountNo -> r))
>> .mergeSubstreams
>>
>> Since I cannot access the sub-streams I need to simulate this through a
>> Map ..
>>
>> Thanks.
>>
>> On Tue, Apr 26, 2016 at 11:32 PM, Debasish Ghosh <
>> ghosh.debas...@gmail.com> wrote:
>>
>>> Thanks Viktor .. my main issue is that we lose the natural semantics of
>>> groupBy as we learnt from SQL. We cannot access the substreams as separate
>>> abstractions that groupBy creates.
>>>
>>> On Tue, Apr 26, 2016 at 11:28 PM, Viktor Klang 
>>> wrote:
>>>
 Debasish,

 The problem with groupBy was that it was too easy to create leaks and
 silently broken solutions with it, but I sympathize with your situation, I
 also felt the original as being more ergonomic. I think there is room for
 improvement in the current solution.

 --
 Cheers,
 √
 On Apr 26, 2016 7:20 PM, "Debasish Ghosh" 
 wrote:

 Roland -

 The problem is I cannot access the substreams separately within the
 fold. In the earlier solution the fold was on individual substreams (not on
 the whole stream). Here the fold gets *all* the elements from all
 substreams and I lose the ability to process substreams separately. Hence I
 lose the ability to compose with a Monoid. Possibly I need to create a
 separate Map and then do the aggregation. It will be clunky .. I am already
 starting to feel the loss of the 1.0 API, which I thought was very
 idiomatic from groupBy point of view.

 Let me see if I can at all get a solution for this ..

 regards.

 On Tue, Apr 26, 2016 at 10:33 PM, Roland Kuhn 
 wrote:

> Instead of using a fold sink which materializes to a Future you’ll
> need to use a fold combinator which produces the result as its only value
> after the substream has completed, i.e. you keep the computation results
> within the streaming domain a bit longer instead of going to the Future
> domain immediately. Your 1.0 solution must have done something very
> similar, only with a different API (i.e. you folded the substreams and 
> then
> probably used mapAsync to flatten the resulting stream of Futures).
>
> Regards,
>
> Roland
>
> 26 apr 2016 kl. 18:05 skrev Debasish Ghosh :
>
> Roland -
>
> I need to merge into a fold sink w/ a Monoid. Consider the following
> example ..
>
> case class Transaction(id: String, accountNo: String, debitCredit:
> TransactionType, amount: Amount, date: Date = today)
>
> and I have the following list of Transactions ..
>
> val txns =
> Seq(
>   Transaction("t-1", "a-1", Debit, 1000),
>   Transaction("t-2", "a-2", Debit, 1000),
>   Transaction("t-3", "a-3", Credit, 1000),
>   Transaction("t-4", "a-1", Credit, 1000),
>   Transaction("t-5", "a-1", Debit, 1000),
>   Transaction("t-6", "a-2", Debit, 1000),
>   Transaction("t-7", "a-3", Credit, 1000),
>   Transaction("t-8", "a-3", Debit, 1000),
>   Transaction("t-9", "a-2", Credit, 1000),
>   Transaction("t-10", "a-2", Debit, 1000),
>   Transaction("t-11", "a-1", Credit, 1000),
>   Transaction("t-12", "a-3", Debit, 1000)
> )
>
> I do a group by accountNo, which gives me 3 substreams. Each
> substream needs to go into a fold sink where I fold using a Monoid. The
> Monoid has the logic of merging transactions belonging to the same
> accountNo. The logic that u suggest will not do this. I need to access the
> substreams separately. That's why I did a groupBy (similar to SQL 
> groupBy).
> And I could do this in the 1.0 version.
>
> Any workaround that u suggest ?
>
> Thanks.
>
> On Tue, Apr 26, 2016 at 9:18 PM, Roland Kuhn 
> wrote:
>
>> If you need the results from the substreams you’ll have to merge them
>> back into the mainstream and

Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-26 Thread Roland Kuhn

> 26 apr 2016 kl. 20:14 skrev Debasish Ghosh :
> 
> Just for the sake of completeness, this works ..
> 
> transactions.map(validate)
> .groupBy(MaxGroupCount, _.accountNo)
> .fold(Map.empty[String, Transaction])((l, r) => l |+| 
> Map(r.accountNo -> r))
> .mergeSubstreams
> 

Yes, this is what I proposed a while ago :-) (more or less, see below)

> Since I cannot access the sub-streams I need to simulate this through a Map ..

I don’t fully understand: the fold is running on each of the substreams, there 
is nothing that is inaccessible. The only difference to the old API is that the 
key is not available in pre-extracted form (i.e. things are not wrapped in 
tuples). You also don’t need to use a Map, you could also keep the last 
r.accountNo in the first position of a tuple and the current Transaction in the 
second—the result then looks exactly the same as it did with streams 1.0. OTOH 
I suspect that your monoid leaves the accountNo intact, so there is not even a 
need to allocate those tuples.

Would you please explain how our groupBy loses the semantics from SQL? I’m a 
bit rusty, but the group “name” is also not made available in a dedicated 
fashion there, AFAIR. The current shape of the API was inspired by BigData 
analytics tools that model this operation in the same way, because the previous 
scheme was simply too powerful (it allowed arbitrary dynamic treatment of each 
of the substreams—SQL grouping is also homogenous and not different for each 
group). Dynamic elements also defeat optimizations like fusing and graph 
simplification.

Regards,

Roland

> 
> Thanks.
> 
> On Tue, Apr 26, 2016 at 11:32 PM, Debasish Ghosh  > wrote:
> Thanks Viktor .. my main issue is that we lose the natural semantics of 
> groupBy as we learnt from SQL. We cannot access the substreams as separate 
> abstractions that groupBy creates.
> 
> On Tue, Apr 26, 2016 at 11:28 PM, Viktor Klang  > wrote:
> Debasish,
> 
> The problem with groupBy was that it was too easy to create leaks and 
> silently broken solutions with it, but I sympathize with your situation, I 
> also felt the original as being more ergonomic. I think there is room for 
> improvement in the current solution.
> 
> -- 
> Cheers,
> √
> On Apr 26, 2016 7:20 PM, "Debasish Ghosh"  > wrote:
> Roland -
> 
> The problem is I cannot access the substreams separately within the fold. In 
> the earlier solution the fold was on individual substreams (not on the whole 
> stream). Here the fold gets *all* the elements from all substreams and I lose 
> the ability to process substreams separately. Hence I lose the ability to 
> compose with a Monoid. Possibly I need to create a separate Map and then do 
> the aggregation. It will be clunky .. I am already starting to feel the loss 
> of the 1.0 API, which I thought was very idiomatic from groupBy point of view.
> 
> Let me see if I can at all get a solution for this ..
> 
> regards.
> 
> On Tue, Apr 26, 2016 at 10:33 PM, Roland Kuhn  > wrote:
> Instead of using a fold sink which materializes to a Future you’ll need to 
> use a fold combinator which produces the result as its only value after the 
> substream has completed, i.e. you keep the computation results within the 
> streaming domain a bit longer instead of going to the Future domain 
> immediately. Your 1.0 solution must have done something very similar, only 
> with a different API (i.e. you folded the substreams and then probably used 
> mapAsync to flatten the resulting stream of Futures).
> 
> Regards,
> 
> Roland
> 
>> 26 apr 2016 kl. 18:05 skrev Debasish Ghosh > >:
>> 
>> Roland -
>> 
>> I need to merge into a fold sink w/ a Monoid. Consider the following example 
>> ..
>> 
>> case class Transaction(id: String, accountNo: String, debitCredit: 
>> TransactionType, amount: Amount, date: Date = today)
>> 
>> and I have the following list of Transactions ..
>> 
>> val txns =
>> Seq(
>>   Transaction("t-1", "a-1", Debit, 1000),
>>   Transaction("t-2", "a-2", Debit, 1000),
>>   Transaction("t-3", "a-3", Credit, 1000),
>>   Transaction("t-4", "a-1", Credit, 1000),
>>   Transaction("t-5", "a-1", Debit, 1000),
>>   Transaction("t-6", "a-2", Debit, 1000),
>>   Transaction("t-7", "a-3", Credit, 1000),
>>   Transaction("t-8", "a-3", Debit, 1000),
>>   Transaction("t-9", "a-2", Credit, 1000),
>>   Transaction("t-10", "a-2", Debit, 1000),
>>   Transaction("t-11", "a-1", Credit, 1000),
>>   Transaction("t-12", "a-3", Debit, 1000)
>> )
>> 
>> I do a group by accountNo, which gives me 3 substreams. Each substream needs 
>> to go into a fold sink where I fold using a Monoid. The Monoid has the logic 
>> of merging transactions belonging to the same accountNo. The logic that u 
>> suggest will not do this. I need to access the s

Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-26 Thread Debasish Ghosh
Thanks Roland a lot for the clarification .. I can do this ..

val netTxn: RunnableGraph[Future[akka.Done]] =
transactions.map(validate)
.groupBy(MaxGroupCount, _.accountNo)
.fold(TransactionMonoid.zero)(_ |+| _)
.mergeSubstreams

Only difference with the 1.0 API is that I cannot do the fold in the Sink -
instead I have to do it here. I don't mind, it may even be better this way.
Just one clarification .. In the above snippet are the folds on individual
substreams done in parallel ?

Thanks again for the help ..
Regards.


On Wed, Apr 27, 2016 at 2:53 AM, Roland Kuhn  wrote:

>
> 26 apr 2016 kl. 20:14 skrev Debasish Ghosh :
>
> Just for the sake of completeness, this works ..
>
> transactions.map(validate)
> .groupBy(MaxGroupCount, _.accountNo)
> .fold(Map.empty[String, Transaction])((l, r) => l |+|
> Map(r.accountNo -> r))
> .mergeSubstreams
>
>
> Yes, this is what I proposed a while ago :-) (more or less, see below)
>
> Since I cannot access the sub-streams I need to simulate this through a
> Map ..
>
>
> I don’t fully understand: the fold is running on each of the substreams,
> there is nothing that is inaccessible. The only difference to the old API
> is that the key is not available in pre-extracted form (i.e. things are not
> wrapped in tuples). You also don’t need to use a Map, you could also keep
> the last r.accountNo in the first position of a tuple and the current
> Transaction in the second—the result then looks exactly the same as it did
> with streams 1.0. OTOH I suspect that your monoid leaves the accountNo
> intact, so there is not even a need to allocate those tuples.
>
> Would you please explain how our groupBy loses the semantics from SQL? I’m
> a bit rusty, but the group “name” is also not made available in a dedicated
> fashion there, AFAIR. The current shape of the API was inspired by BigData
> analytics tools that model this operation in the same way, because the
> previous scheme was simply too powerful (it allowed arbitrary dynamic
> treatment of each of the substreams—SQL grouping is also homogenous and not
> different for each group). Dynamic elements also defeat optimizations like
> fusing and graph simplification.
>
> Regards,
>
> Roland
>
>
> Thanks.
>
> On Tue, Apr 26, 2016 at 11:32 PM, Debasish Ghosh  > wrote:
>
>> Thanks Viktor .. my main issue is that we lose the natural semantics of
>> groupBy as we learnt from SQL. We cannot access the substreams as separate
>> abstractions that groupBy creates.
>>
>> On Tue, Apr 26, 2016 at 11:28 PM, Viktor Klang 
>> wrote:
>>
>>> Debasish,
>>>
>>> The problem with groupBy was that it was too easy to create leaks and
>>> silently broken solutions with it, but I sympathize with your situation, I
>>> also felt the original as being more ergonomic. I think there is room for
>>> improvement in the current solution.
>>> --
>>> Cheers,
>>> √
>>> On Apr 26, 2016 7:20 PM, "Debasish Ghosh" 
>>> wrote:
>>>
>>> Roland -
>>>
>>> The problem is I cannot access the substreams separately within the
>>> fold. In the earlier solution the fold was on individual substreams (not on
>>> the whole stream). Here the fold gets *all* the elements from all
>>> substreams and I lose the ability to process substreams separately. Hence I
>>> lose the ability to compose with a Monoid. Possibly I need to create a
>>> separate Map and then do the aggregation. It will be clunky .. I am already
>>> starting to feel the loss of the 1.0 API, which I thought was very
>>> idiomatic from groupBy point of view.
>>>
>>> Let me see if I can at all get a solution for this ..
>>>
>>> regards.
>>>
>>> On Tue, Apr 26, 2016 at 10:33 PM, Roland Kuhn  wrote:
>>>
 Instead of using a fold sink which materializes to a Future you’ll need
 to use a fold combinator which produces the result as its only value after
 the substream has completed, i.e. you keep the computation results within
 the streaming domain a bit longer instead of going to the Future domain
 immediately. Your 1.0 solution must have done something very similar, only
 with a different API (i.e. you folded the substreams and then probably used
 mapAsync to flatten the resulting stream of Futures).

 Regards,

 Roland

 26 apr 2016 kl. 18:05 skrev Debasish Ghosh :

 Roland -

 I need to merge into a fold sink w/ a Monoid. Consider the following
 example ..

 case class Transaction(id: String, accountNo: String, debitCredit:
 TransactionType, amount: Amount, date: Date = today)

 and I have the following list of Transactions ..

 val txns =
 Seq(
   Transaction("t-1", "a-1", Debit, 1000),
   Transaction("t-2", "a-2", Debit, 1000),
   Transaction("t-3", "a-3", Credit, 1000),
   Transaction("t-4", "a-1", Credit, 1000),
   Transaction("t-5", "a-1", Debit, 1000),
   Transaction("t-6", "a-2",

Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-26 Thread Roland Kuhn
Hi Debasish,

The current fusing algorithm is called «aggressive» for a reason: it will fuse 
everything it can. To run the subflows in parallel you will have to add a 
.async after the fold. But that will only be beneficial if the monoid’s |+| is 
rather expensive.

Regards, Roland 

Sent from my iPhone

> On 27 Apr 2016, at 06:10, Debasish Ghosh  wrote:
> 
> Thanks Roland a lot for the clarification .. I can do this ..
> 
> val netTxn: RunnableGraph[Future[akka.Done]] = 
> transactions.map(validate)
> .groupBy(MaxGroupCount, _.accountNo)
> .fold(TransactionMonoid.zero)(_ |+| _)
> .mergeSubstreams
> 
> Only difference with the 1.0 API is that I cannot do the fold in the Sink - 
> instead I have to do it here. I don't mind, it may even be better this way. 
> Just one clarification .. In the above snippet are the folds on individual 
> substreams done in parallel ?
> 
> Thanks again for the help ..
> Regards.
> 
> 
>> On Wed, Apr 27, 2016 at 2:53 AM, Roland Kuhn  wrote:
>> 
>>> 26 apr 2016 kl. 20:14 skrev Debasish Ghosh :
>>> 
>>> Just for the sake of completeness, this works ..
>>> 
>>> transactions.map(validate)
>>> .groupBy(MaxGroupCount, _.accountNo)
>>> .fold(Map.empty[String, Transaction])((l, r) => l |+| 
>>> Map(r.accountNo -> r))
>>> .mergeSubstreams
>> 
>> Yes, this is what I proposed a while ago :-) (more or less, see below)
>> 
>>> Since I cannot access the sub-streams I need to simulate this through a Map 
>>> ..
>> 
>> I don’t fully understand: the fold is running on each of the substreams, 
>> there is nothing that is inaccessible. The only difference to the old API is 
>> that the key is not available in pre-extracted form (i.e. things are not 
>> wrapped in tuples). You also don’t need to use a Map, you could also keep 
>> the last r.accountNo in the first position of a tuple and the current 
>> Transaction in the second—the result then looks exactly the same as it did 
>> with streams 1.0. OTOH I suspect that your monoid leaves the accountNo 
>> intact, so there is not even a need to allocate those tuples.
>> 
>> Would you please explain how our groupBy loses the semantics from SQL? I’m a 
>> bit rusty, but the group “name” is also not made available in a dedicated 
>> fashion there, AFAIR. The current shape of the API was inspired by BigData 
>> analytics tools that model this operation in the same way, because the 
>> previous scheme was simply too powerful (it allowed arbitrary dynamic 
>> treatment of each of the substreams—SQL grouping is also homogenous and not 
>> different for each group). Dynamic elements also defeat optimizations like 
>> fusing and graph simplification.
>> 
>> Regards,
>> 
>> Roland
>> 
>>> 
>>> Thanks.
>>> 
 On Tue, Apr 26, 2016 at 11:32 PM, Debasish Ghosh 
  wrote:
 Thanks Viktor .. my main issue is that we lose the natural semantics of 
 groupBy as we learnt from SQL. We cannot access the substreams as separate 
 abstractions that groupBy creates.
 
> On Tue, Apr 26, 2016 at 11:28 PM, Viktor Klang  
> wrote:
> Debasish,
> 
> The problem with groupBy was that it was too easy to create leaks and 
> silently broken solutions with it, but I sympathize with your situation, 
> I also felt the original as being more ergonomic. I think there is room 
> for improvement in the current solution.
> 
> -- 
> Cheers,
> √
> On Apr 26, 2016 7:20 PM, "Debasish Ghosh"  
> wrote:
> Roland -
> 
> The problem is I cannot access the substreams separately within the fold. 
> In the earlier solution the fold was on individual substreams (not on the 
> whole stream). Here the fold gets *all* the elements from all substreams 
> and I lose the ability to process substreams separately. Hence I lose the 
> ability to compose with a Monoid. Possibly I need to create a separate 
> Map and then do the aggregation. It will be clunky .. I am already 
> starting to feel the loss of the 1.0 API, which I thought was very 
> idiomatic from groupBy point of view.
> 
> Let me see if I can at all get a solution for this ..
> 
> regards.
> 
>> On Tue, Apr 26, 2016 at 10:33 PM, Roland Kuhn  wrote:
>> Instead of using a fold sink which materializes to a Future you’ll need 
>> to use a fold combinator which produces the result as its only value 
>> after the substream has completed, i.e. you keep the computation results 
>> within the streaming domain a bit longer instead of going to the Future 
>> domain immediately. Your 1.0 solution must have done something very 
>> similar, only with a different API (i.e. you folded the substreams and 
>> then probably used mapAsync to flatten the resulting stream of Futures).
>> 
>> Regards,
>> 
>> Roland
>> 
>>> 26 apr 2016 kl. 18:05 skrev Debasish Ghosh :
>>> 
>>> Roland -
>>>

Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-26 Thread Debasish Ghosh
Thanks for the clarification. A good point to keep in mind.

regards.

On Wed, Apr 27, 2016 at 11:22 AM, Roland Kuhn  wrote:

> Hi Debasish,
>
> The current fusing algorithm is called «aggressive» for a reason: it will
> fuse everything it can. To run the subflows in parallel you will have to
> add a .async after the fold. But that will only be beneficial if the
> monoid’s |+| is rather expensive.
>
> Regards, Roland
>
> Sent from my iPhone
>
> On 27 Apr 2016, at 06:10, Debasish Ghosh  wrote:
>
> Thanks Roland a lot for the clarification .. I can do this ..
>
> val netTxn: RunnableGraph[Future[akka.Done]] =
> transactions.map(validate)
> .groupBy(MaxGroupCount, _.accountNo)
> .fold(TransactionMonoid.zero)(_ |+| _)
> .mergeSubstreams
>
> Only difference with the 1.0 API is that I cannot do the fold in the Sink
> - instead I have to do it here. I don't mind, it may even be better this
> way. Just one clarification .. In the above snippet are the folds on
> individual substreams done in parallel ?
>
> Thanks again for the help ..
> Regards.
>
>
> On Wed, Apr 27, 2016 at 2:53 AM, Roland Kuhn  wrote:
>
>>
>> 26 apr 2016 kl. 20:14 skrev Debasish Ghosh :
>>
>> Just for the sake of completeness, this works ..
>>
>> transactions.map(validate)
>> .groupBy(MaxGroupCount, _.accountNo)
>> .fold(Map.empty[String, Transaction])((l, r) => l |+|
>> Map(r.accountNo -> r))
>> .mergeSubstreams
>>
>>
>> Yes, this is what I proposed a while ago :-) (more or less, see below)
>>
>> Since I cannot access the sub-streams I need to simulate this through a
>> Map ..
>>
>>
>> I don’t fully understand: the fold is running on each of the substreams,
>> there is nothing that is inaccessible. The only difference to the old API
>> is that the key is not available in pre-extracted form (i.e. things are not
>> wrapped in tuples). You also don’t need to use a Map, you could also keep
>> the last r.accountNo in the first position of a tuple and the current
>> Transaction in the second—the result then looks exactly the same as it did
>> with streams 1.0. OTOH I suspect that your monoid leaves the accountNo
>> intact, so there is not even a need to allocate those tuples.
>>
>> Would you please explain how our groupBy loses the semantics from SQL?
>> I’m a bit rusty, but the group “name” is also not made available in a
>> dedicated fashion there, AFAIR. The current shape of the API was inspired
>> by BigData analytics tools that model this operation in the same way,
>> because the previous scheme was simply too powerful (it allowed arbitrary
>> dynamic treatment of each of the substreams—SQL grouping is also homogenous
>> and not different for each group). Dynamic elements also defeat
>> optimizations like fusing and graph simplification.
>>
>> Regards,
>>
>> Roland
>>
>>
>> Thanks.
>>
>> On Tue, Apr 26, 2016 at 11:32 PM, Debasish Ghosh <
>> ghosh.debas...@gmail.com> wrote:
>>
>>> Thanks Viktor .. my main issue is that we lose the natural semantics of
>>> groupBy as we learnt from SQL. We cannot access the substreams as separate
>>> abstractions that groupBy creates.
>>>
>>> On Tue, Apr 26, 2016 at 11:28 PM, Viktor Klang 
>>> wrote:
>>>
 Debasish,

 The problem with groupBy was that it was too easy to create leaks and
 silently broken solutions with it, but I sympathize with your situation, I
 also felt the original as being more ergonomic. I think there is room for
 improvement in the current solution.
 --
 Cheers,
 √
 On Apr 26, 2016 7:20 PM, "Debasish Ghosh" 
 wrote:

 Roland -

 The problem is I cannot access the substreams separately within the
 fold. In the earlier solution the fold was on individual substreams (not on
 the whole stream). Here the fold gets *all* the elements from all
 substreams and I lose the ability to process substreams separately. Hence I
 lose the ability to compose with a Monoid. Possibly I need to create a
 separate Map and then do the aggregation. It will be clunky .. I am already
 starting to feel the loss of the 1.0 API, which I thought was very
 idiomatic from groupBy point of view.

 Let me see if I can at all get a solution for this ..

 regards.

 On Tue, Apr 26, 2016 at 10:33 PM, Roland Kuhn 
 wrote:

> Instead of using a fold sink which materializes to a Future you’ll
> need to use a fold combinator which produces the result as its only value
> after the substream has completed, i.e. you keep the computation results
> within the streaming domain a bit longer instead of going to the Future
> domain immediately. Your 1.0 solution must have done something very
> similar, only with a different API (i.e. you folded the substreams and 
> then
> probably used mapAsync to flatten the resulting stream of Futures).
>
> Regards,
>
> Roland
>
> 26 apr 2016 kl.

Re: [akka-user] migrating from an earlier version of akka-streams ..

2016-04-27 Thread Juan José Vázquez Delgado
wow, this is not definitely well described in the current documentation. I 
went through a similar chain of mistakes and misconceptions regarding the 
semantic of the new groupBy/merge primitives. Thanks Debassigh for bringing 
this here. Regards, Juanjo.

El miércoles, 27 de abril de 2016, 7:54:53 (UTC+2), debasish escribió:
>
> Thanks for the clarification. A good point to keep in mind.
>
> regards.
>
> On Wed, Apr 27, 2016 at 11:22 AM, Roland Kuhn  > wrote:
>
> Hi Debasish,
>
> The current fusing algorithm is called «aggressive» for a reason: it will 
> fuse everything it can. To run the subflows in parallel you will have to 
> add a .async after the fold. But that will only be beneficial if the 
> monoid’s |+| is rather expensive.
>
> Regards, Roland 
>
> Sent from my iPhone
>
> On 27 Apr 2016, at 06:10, Debasish Ghosh  > wrote:
>
> Thanks Roland a lot for the clarification .. I can do this ..
>
> val netTxn: RunnableGraph[Future[akka.Done]] = 
> transactions.map(validate)
> .groupBy(MaxGroupCount, _.accountNo)
> .fold(TransactionMonoid.zero)(_ |+| _)
> .mergeSubstreams
>
> Only difference with the 1.0 API is that I cannot do the fold in the Sink 
> - instead I have to do it here. I don't mind, it may even be better this 
> way. Just one clarification .. In the above snippet are the folds on 
> individual substreams done in parallel ?
>
> Thanks again for the help ..
> Regards.
>
>
> On Wed, Apr 27, 2016 at 2:53 AM, Roland Kuhn  > wrote:
>
>
> 26 apr 2016 kl. 20:14 skrev Debasish Ghosh  >:
>
> Just for the sake of completeness, this works ..
>
> transactions.map(validate)
> .groupBy(MaxGroupCount, _.accountNo)
> .fold(Map.empty[String, Transaction])((l, r) => l |+| 
> Map(r.accountNo -> r))
> .mergeSubstreams
>
>
> Yes, this is what I proposed a while ago :-) (more or less, see below)
>
> Since I cannot access the sub-streams I need to simulate this through a 
> Map ..
>
>
> I don’t fully understand: the fold is running on each of the substreams, 
> there is nothing that is inaccessible. The only difference to the old API 
> is that the key is not available in pre-extracted form (i.e. things are not 
> wrapped in tuples). You also don’t need to use a Map, you could also keep 
> the last r.accountNo in the first position of a tuple and the current 
> Transaction in the second—the result then looks exactly the same as it did 
> with streams 1.0. OTOH I suspect that your monoid leaves the accountNo 
> intact, so there is not even a need to allocate those tuples.
>
> Would you please explain how our groupBy loses the semantics from SQL? I’m 
> a bit rusty, but the group “name” is also not made available in a dedicated 
> fashion there, AFAIR. The current shape of the API was inspired by BigData 
> analytics tools that model this operation in the same way, because the 
> previous scheme was simply too powerful (it allowed arbitrary dynamic 
> treatment of each of the substreams—SQL grouping is also homogenous and not 
> different for each group). Dynamic elements also defeat optimizations like 
> fusing and graph simplification.
>
> Regards,
>
> Roland
>
>
> Thanks.
>
> On Tue, Apr 26, 2016 at 11:32 PM, Debasish Ghosh  > wrote:
>
> Thanks Viktor .. my main issue is that we lose the natural semantics of 
> groupBy as we learnt from SQL. We cannot access the substreams as separate 
> abstractions that groupBy creates.
>
> On Tue, Apr 26, 2016 at 11:28 PM, Viktor Klang  > wrote:
>
> Debasish,
>
> The problem with groupBy was that it was too easy to create leaks and 
> silently broken solutions with it, but I sympathize with your situation, I 
> also felt the original as being more ergonomic. I think there is room for 
> improvement in the current solution.
> -- 
> Cheers,
> √
> On Apr 26, 2016 7:20 PM, "Debasish Ghosh"  > wrote:
>
> Roland -
>
> The problem is I cannot access the substreams separately within the fold. 
> In the earlier solution the fold was on individual substreams (not on the 
> whole stream). Here the fold gets *all* the elements from all substreams 
> and I lose the ability to process substreams separately. Hence I lose the 
> ability to compose with a Monoid. Possibly I need to create a separate Map 
> and then do the aggregation. It will be clunky .. I am already starting to 
> feel the loss of the 1.0 API, which I thought was very idiomatic from 
> groupBy point of view.
>
> Let me see if I can at all get a solution for this ..
>
> regards.
>
> On Tue, Apr 26, 2016 at 10:33 PM, Roland Kuhn  > wrote:
>
> Instead of using a fold sink which materializes to a Future you’ll need to 
> use a fold combinator which produces the result as its only value after the 
> substream has completed, i.e. you keep the computation results within the 
> streaming domain a bit longer instead of going to the Future domain 
> immediately. Your 1.0 solution must have done something very similar, only 
> with a different API (