2016-01-24 12:36 GMT+01:00 Tagir F. Valeev <amae...@gmail.com>:
> Hello!
>
> I'm investigating Stream.limit() performance for parallel ordered
> streams when no SUBSIZED optimization is performed.
>
> Consider the following code, for example:
>
> AtomicInteger counter = new AtomicInteger();
> int[] result = IntStream.range(0, 1_000_000).parallel().filter(x -> true)
>         .peek(x -> counter.incrementAndGet()).limit(10).toArray();
> System.out.println(Arrays.toString(result));
> System.out.println(counter.get());
>
> How much the counter.get() would print? It changes from launch to
> launch, but usually within 375000..625000. This is just insane. On my
> 4-core system parallel stream creates 16 individual tasks. I expect
> that every individual task should consume no more than 10 elements, so
> in total no more than 160 elements should be consumed in this case.
>
> Here's a patch which addresses this issue:
> http://cr.openjdk.java.net/~tvaleev/patches/limit/limit-patch.txt
>
> In the limit case non-root leaf tasks may switch to copyIntoWithCancel
> to control the count of consumed elements and do not consume more than
> necessary.
>
> This change seems to fix the issue addressed in comment (at least
> partially):
>
> // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> 
> true).limit(n)
> //     regardless of the value of n
> //     Need to adjust the target size of splitting for the
> //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
> //     This will limit the size of the buffers created at the leaf nodes
> //     cancellation will be more aggressive cancelling later tasks
> //     if the target slice size has been reached from a given task,
> //     cancellation should also clear local results if any
>
> I checked with the following code:
>
> for(int n : new int[] {10, 100, 1000, 5000, 10000, 50000, 100000, 500000, 
> 1000000}) {
>     System.out.println(n);
>     long[] arr = LongStream.range(0, Long.MAX_VALUE).filter(i -> 
> true).parallel().limit(n).toArray();
>     long[] ref = LongStream.range(0, n).toArray();
>     System.out.println(Arrays.equals(arr, ref));
> }
>
> It works correctly after applying my patch (while dies with OOME
> without patch, as comment suggests).
>
> Currently existing unit tests also pass with my patch. However I'm
> very new in the internals of parallel stream processing, so it's
> possible that I'm missing something. Please review! If this looks
> reasonable I will log an issue and write new test cases.
>
> Thank you in advance,
> With best regards,
> Tagir Valeev.
>


Hi Tagir,

I'm not an expert. To me, your suggestion appears to be very sensible.

Regards,
Stefan

Reply via email to