Hello!

I'm investigating Stream.limit() performance for parallel ordered
streams when no SUBSIZED optimization is performed.

Consider the following code, for example:

AtomicInteger counter = new AtomicInteger();
int[] result = IntStream.range(0, 1_000_000).parallel().filter(x -> true)
        .peek(x -> counter.incrementAndGet()).limit(10).toArray();
System.out.println(Arrays.toString(result));
System.out.println(counter.get());

How much the counter.get() would print? It changes from launch to
launch, but usually within 375000..625000. This is just insane. On my
4-core system parallel stream creates 16 individual tasks. I expect
that every individual task should consume no more than 10 elements, so
in total no more than 160 elements should be consumed in this case.

Here's a patch which addresses this issue:
http://cr.openjdk.java.net/~tvaleev/patches/limit/limit-patch.txt

In the limit case non-root leaf tasks may switch to copyIntoWithCancel
to control the count of consumed elements and do not consume more than
necessary.

This change seems to fix the issue addressed in comment (at least
partially):

// @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> 
true).limit(n)
//     regardless of the value of n
//     Need to adjust the target size of splitting for the
//     SliceTask from say (size / k) to say min(size / k, 1 << 14)
//     This will limit the size of the buffers created at the leaf nodes
//     cancellation will be more aggressive cancelling later tasks
//     if the target slice size has been reached from a given task,
//     cancellation should also clear local results if any

I checked with the following code:

for(int n : new int[] {10, 100, 1000, 5000, 10000, 50000, 100000, 500000, 
1000000}) {
    System.out.println(n);
    long[] arr = LongStream.range(0, Long.MAX_VALUE).filter(i -> 
true).parallel().limit(n).toArray();
    long[] ref = LongStream.range(0, n).toArray();
    System.out.println(Arrays.equals(arr, ref));
}

It works correctly after applying my patch (while dies with OOME
without patch, as comment suggests).

Currently existing unit tests also pass with my patch. However I'm
very new in the internals of parallel stream processing, so it's
possible that I'm missing something. Please review! If this looks
reasonable I will log an issue and write new test cases.

Thank you in advance,
With best regards,
Tagir Valeev.

Reply via email to