RFR-8148250: Stream.limit parallel ordered performance

2016-01-29 Thread Tagir F. Valeev
Hello!

Here's webrev:
http://cr.openjdk.java.net/~tvaleev/webrev/8148250/r1/

With best regards,
Tagir Valeev.

>> On 26 Jan 2016, at 16:51, Tagir F. Valeev  wrote:
>> 
>> Hello!
>> 
>> Thank you for review! Here's the issue:
>> https://bugs.openjdk.java.net/browse/JDK-8148250
>> Will post complete webrev later.
>> 
>> PS> Note that it is still the case that in almost all scenarios this
>> PS> is likely to be a bad form of parallel stream.
>> 
>> Well not always it's possible to estimate in advance the size of the
>> stream. Consider that we have user-specified filter upstream which
>> searches over the big collection (we want to return first 10 search
>> results, order is important):
>> 
>> data.parallelStream()
>>.filter(element -> userQuery.test(element))
>>.limit(10).collect(toList());
>> 
>> If user query produces 10-15 results, using parallel stream is very
>> reasonable, but if it produces millions of results it should not
>> regress very much (at least should not become much slower than
>> sequential version which is what we see currently).

PS> I have my doubts that the cost of splitting and filtering a small
PS> number of elements concurrently will pay off in the majority of
PS> scenarios, hence the “almost all”.

PS> It could work in cases where there is lots of data to be filtered
PS> and only a few items are reported that are proportionally spread
PS> out, or over small data and the filter operation is costly.

PS> In any case it’s good to avoid the OOME, i am very glad you found a simple 
way to resolve that.


>> 
>> PS> I think the comment you refer to still applies but now for larger n, so 
>> we should refine it.
>> 
>> Should we replace "regardless of the value of n" with "when
>> n*parallelismLevel is sufficiently large”?

PS> Yes, when N * P is sufficiently large e.g to pluck a number out of the air 
> 2^32, say

PS> Paul.



Re: RFR-8148250: Stream.limit parallel ordered performance

2016-01-29 Thread Paul Sandoz

> On 29 Jan 2016, at 14:47, Tagir F. Valeev  wrote:
> 
> Hello!
> 
> Here's webrev:
> http://cr.openjdk.java.net/~tvaleev/webrev/8148250/r1/
> 

Thanks, reviewed and in my queue to push.

Paul.



Re: Stream.limit parallel ordered performance

2016-01-26 Thread Paul Sandoz
Hi Tagir,

It is insane :-) in hindsight i cannot believe i missed this trick!

You patch looks reasonable and i don’t see any issue with it. Each leaf-task 
will collect at most n elements. The truncation will take care of everything 
else.

Note that it is still the case that in almost all scenarios this is likely to 
be a bad form of parallel stream.

I think the comment you refer to still applies but now for larger n, so we 
should refine it.

Paul.

> On 24 Jan 2016, at 12:36, Tagir F. Valeev  wrote:
> 
> Hello!
> 
> I'm investigating Stream.limit() performance for parallel ordered
> streams when no SUBSIZED optimization is performed.
> 
> Consider the following code, for example:
> 
> AtomicInteger counter = new AtomicInteger();
> int[] result = IntStream.range(0, 1_000_000).parallel().filter(x -> true)
>.peek(x -> counter.incrementAndGet()).limit(10).toArray();
> System.out.println(Arrays.toString(result));
> System.out.println(counter.get());
> 
> How much the counter.get() would print? It changes from launch to
> launch, but usually within 375000..625000. This is just insane. On my
> 4-core system parallel stream creates 16 individual tasks. I expect
> that every individual task should consume no more than 10 elements, so
> in total no more than 160 elements should be consumed in this case.
> 
> Here's a patch which addresses this issue:
> http://cr.openjdk.java.net/~tvaleev/patches/limit/limit-patch.txt
> 
> In the limit case non-root leaf tasks may switch to copyIntoWithCancel
> to control the count of consumed elements and do not consume more than
> necessary.
> 
> This change seems to fix the issue addressed in comment (at least
> partially):
> 
> // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> 
> true).limit(n)
> // regardless of the value of n
> // Need to adjust the target size of splitting for the
> // SliceTask from say (size / k) to say min(size / k, 1 << 14)
> // This will limit the size of the buffers created at the leaf nodes
> // cancellation will be more aggressive cancelling later tasks
> // if the target slice size has been reached from a given task,
> // cancellation should also clear local results if any
> 
> I checked with the following code:
> 
> for(int n : new int[] {10, 100, 1000, 5000, 1, 5, 10, 50, 
> 100}) {
>System.out.println(n);
>long[] arr = LongStream.range(0, Long.MAX_VALUE).filter(i -> 
> true).parallel().limit(n).toArray();
>long[] ref = LongStream.range(0, n).toArray();
>System.out.println(Arrays.equals(arr, ref));
> }
> 
> It works correctly after applying my patch (while dies with OOME
> without patch, as comment suggests).
> 
> Currently existing unit tests also pass with my patch. However I'm
> very new in the internals of parallel stream processing, so it's
> possible that I'm missing something. Please review! If this looks
> reasonable I will log an issue and write new test cases.
> 
> Thank you in advance,
> With best regards,
> Tagir Valeev.
> 



Re: 8148250 Stream.limit parallel ordered performance

2016-01-26 Thread Tagir F. Valeev
Hello!

Thank you for review! Here's the issue:
https://bugs.openjdk.java.net/browse/JDK-8148250
Will post complete webrev later.

PS> Note that it is still the case that in almost all scenarios this
PS> is likely to be a bad form of parallel stream.

Well not always it's possible to estimate in advance the size of the
stream. Consider that we have user-specified filter upstream which
searches over the big collection (we want to return first 10 search
results, order is important):

data.parallelStream()
.filter(element -> userQuery.test(element))
.limit(10).collect(toList());

If user query produces 10-15 results, using parallel stream is very
reasonable, but if it produces millions of results it should not
regress very much (at least should not become much slower than
sequential version which is what we see currently).

PS> I think the comment you refer to still applies but now for larger n, so we 
should refine it.

Should we replace "regardless of the value of n" with "when
n*parallelismLevel is sufficiently large"?

With best regards,
Tagir Valeev.

PS> Paul.

>> On 24 Jan 2016, at 12:36, Tagir F. Valeev  wrote:
>> 
>> Hello!
>> 
>> I'm investigating Stream.limit() performance for parallel ordered
>> streams when no SUBSIZED optimization is performed.
>> 
>> Consider the following code, for example:
>> 
>> AtomicInteger counter = new AtomicInteger();
>> int[] result = IntStream.range(0, 1_000_000).parallel().filter(x -> true)
>>.peek(x -> counter.incrementAndGet()).limit(10).toArray();
>> System.out.println(Arrays.toString(result));
>> System.out.println(counter.get());
>> 
>> How much the counter.get() would print? It changes from launch to
>> launch, but usually within 375000..625000. This is just insane. On my
>> 4-core system parallel stream creates 16 individual tasks. I expect
>> that every individual task should consume no more than 10 elements, so
>> in total no more than 160 elements should be consumed in this case.
>> 
>> Here's a patch which addresses this issue:
>> http://cr.openjdk.java.net/~tvaleev/patches/limit/limit-patch.txt
>> 
>> In the limit case non-root leaf tasks may switch to copyIntoWithCancel
>> to control the count of consumed elements and do not consume more than
>> necessary.
>> 
>> This change seems to fix the issue addressed in comment (at least
>> partially):
>> 
>> // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> 
>> true).limit(n)
>> // regardless of the value of n
>> // Need to adjust the target size of splitting for the
>> // SliceTask from say (size / k) to say min(size / k, 1 << 14)
>> // This will limit the size of the buffers created at the leaf nodes
>> // cancellation will be more aggressive cancelling later tasks
>> // if the target slice size has been reached from a given task,
>> // cancellation should also clear local results if any
>> 
>> I checked with the following code:
>> 
>> for(int n : new int[] {10, 100, 1000, 5000, 1, 5, 10, 50, 
>> 100}) {
>>System.out.println(n);
>>long[] arr = LongStream.range(0, Long.MAX_VALUE).filter(i -> 
>> true).parallel().limit(n).toArray();
>>long[] ref = LongStream.range(0, n).toArray();
>>System.out.println(Arrays.equals(arr, ref));
>> }
>> 
>> It works correctly after applying my patch (while dies with OOME
>> without patch, as comment suggests).
>> 
>> Currently existing unit tests also pass with my patch. However I'm
>> very new in the internals of parallel stream processing, so it's
>> possible that I'm missing something. Please review! If this looks
>> reasonable I will log an issue and write new test cases.
>> 
>> Thank you in advance,
>> With best regards,
>> Tagir Valeev.
>> 



Re: 8148250 Stream.limit parallel ordered performance

2016-01-26 Thread Paul Sandoz

> On 26 Jan 2016, at 16:51, Tagir F. Valeev  wrote:
> 
> Hello!
> 
> Thank you for review! Here's the issue:
> https://bugs.openjdk.java.net/browse/JDK-8148250
> Will post complete webrev later.
> 
> PS> Note that it is still the case that in almost all scenarios this
> PS> is likely to be a bad form of parallel stream.
> 
> Well not always it's possible to estimate in advance the size of the
> stream. Consider that we have user-specified filter upstream which
> searches over the big collection (we want to return first 10 search
> results, order is important):
> 
> data.parallelStream()
>.filter(element -> userQuery.test(element))
>.limit(10).collect(toList());
> 
> If user query produces 10-15 results, using parallel stream is very
> reasonable, but if it produces millions of results it should not
> regress very much (at least should not become much slower than
> sequential version which is what we see currently).

I have my doubts that the cost of splitting and filtering a small number of 
elements concurrently will pay off in the majority of scenarios, hence the 
“almost all”.

It could work in cases where there is lots of data to be filtered and only a 
few items are reported that are proportionally spread out, or over small data 
and the filter operation is costly.

In any case it’s good to avoid the OOME, i am very glad you found a simple way 
to resolve that.


> 
> PS> I think the comment you refer to still applies but now for larger n, so 
> we should refine it.
> 
> Should we replace "regardless of the value of n" with "when
> n*parallelismLevel is sufficiently large”?

Yes, when N * P is sufficiently large e.g to pluck a number out of the air > 
2^32, say

Paul.



Re: Stream.limit parallel ordered performance

2016-01-25 Thread Stefan Zobel
2016-01-24 12:36 GMT+01:00 Tagir F. Valeev :
> Hello!
>
> I'm investigating Stream.limit() performance for parallel ordered
> streams when no SUBSIZED optimization is performed.
>
> Consider the following code, for example:
>
> AtomicInteger counter = new AtomicInteger();
> int[] result = IntStream.range(0, 1_000_000).parallel().filter(x -> true)
> .peek(x -> counter.incrementAndGet()).limit(10).toArray();
> System.out.println(Arrays.toString(result));
> System.out.println(counter.get());
>
> How much the counter.get() would print? It changes from launch to
> launch, but usually within 375000..625000. This is just insane. On my
> 4-core system parallel stream creates 16 individual tasks. I expect
> that every individual task should consume no more than 10 elements, so
> in total no more than 160 elements should be consumed in this case.
>
> Here's a patch which addresses this issue:
> http://cr.openjdk.java.net/~tvaleev/patches/limit/limit-patch.txt
>
> In the limit case non-root leaf tasks may switch to copyIntoWithCancel
> to control the count of consumed elements and do not consume more than
> necessary.
>
> This change seems to fix the issue addressed in comment (at least
> partially):
>
> // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> 
> true).limit(n)
> // regardless of the value of n
> // Need to adjust the target size of splitting for the
> // SliceTask from say (size / k) to say min(size / k, 1 << 14)
> // This will limit the size of the buffers created at the leaf nodes
> // cancellation will be more aggressive cancelling later tasks
> // if the target slice size has been reached from a given task,
> // cancellation should also clear local results if any
>
> I checked with the following code:
>
> for(int n : new int[] {10, 100, 1000, 5000, 1, 5, 10, 50, 
> 100}) {
> System.out.println(n);
> long[] arr = LongStream.range(0, Long.MAX_VALUE).filter(i -> 
> true).parallel().limit(n).toArray();
> long[] ref = LongStream.range(0, n).toArray();
> System.out.println(Arrays.equals(arr, ref));
> }
>
> It works correctly after applying my patch (while dies with OOME
> without patch, as comment suggests).
>
> Currently existing unit tests also pass with my patch. However I'm
> very new in the internals of parallel stream processing, so it's
> possible that I'm missing something. Please review! If this looks
> reasonable I will log an issue and write new test cases.
>
> Thank you in advance,
> With best regards,
> Tagir Valeev.
>


Hi Tagir,

I'm not an expert. To me, your suggestion appears to be very sensible.

Regards,
Stefan


Stream.limit parallel ordered performance

2016-01-24 Thread Tagir F. Valeev
Hello!

I'm investigating Stream.limit() performance for parallel ordered
streams when no SUBSIZED optimization is performed.

Consider the following code, for example:

AtomicInteger counter = new AtomicInteger();
int[] result = IntStream.range(0, 1_000_000).parallel().filter(x -> true)
.peek(x -> counter.incrementAndGet()).limit(10).toArray();
System.out.println(Arrays.toString(result));
System.out.println(counter.get());

How much the counter.get() would print? It changes from launch to
launch, but usually within 375000..625000. This is just insane. On my
4-core system parallel stream creates 16 individual tasks. I expect
that every individual task should consume no more than 10 elements, so
in total no more than 160 elements should be consumed in this case.

Here's a patch which addresses this issue:
http://cr.openjdk.java.net/~tvaleev/patches/limit/limit-patch.txt

In the limit case non-root leaf tasks may switch to copyIntoWithCancel
to control the count of consumed elements and do not consume more than
necessary.

This change seems to fix the issue addressed in comment (at least
partially):

// @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> 
true).limit(n)
// regardless of the value of n
// Need to adjust the target size of splitting for the
// SliceTask from say (size / k) to say min(size / k, 1 << 14)
// This will limit the size of the buffers created at the leaf nodes
// cancellation will be more aggressive cancelling later tasks
// if the target slice size has been reached from a given task,
// cancellation should also clear local results if any

I checked with the following code:

for(int n : new int[] {10, 100, 1000, 5000, 1, 5, 10, 50, 
100}) {
System.out.println(n);
long[] arr = LongStream.range(0, Long.MAX_VALUE).filter(i -> 
true).parallel().limit(n).toArray();
long[] ref = LongStream.range(0, n).toArray();
System.out.println(Arrays.equals(arr, ref));
}

It works correctly after applying my patch (while dies with OOME
without patch, as comment suggests).

Currently existing unit tests also pass with my patch. However I'm
very new in the internals of parallel stream processing, so it's
possible that I'm missing something. Please review! If this looks
reasonable I will log an issue and write new test cases.

Thank you in advance,
With best regards,
Tagir Valeev.