2016-01-24 12:36 GMT+01:00 Tagir F. Valeev <amae...@gmail.com>: > Hello! > > I'm investigating Stream.limit() performance for parallel ordered > streams when no SUBSIZED optimization is performed. > > Consider the following code, for example: > > AtomicInteger counter = new AtomicInteger(); > int[] result = IntStream.range(0, 1_000_000).parallel().filter(x -> true) > .peek(x -> counter.incrementAndGet()).limit(10).toArray(); > System.out.println(Arrays.toString(result)); > System.out.println(counter.get()); > > How much the counter.get() would print? It changes from launch to > launch, but usually within 375000..625000. This is just insane. On my > 4-core system parallel stream creates 16 individual tasks. I expect > that every individual task should consume no more than 10 elements, so > in total no more than 160 elements should be consumed in this case. > > Here's a patch which addresses this issue: > http://cr.openjdk.java.net/~tvaleev/patches/limit/limit-patch.txt > > In the limit case non-root leaf tasks may switch to copyIntoWithCancel > to control the count of consumed elements and do not consume more than > necessary. > > This change seems to fix the issue addressed in comment (at least > partially): > > // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> > true).limit(n) > // regardless of the value of n > // Need to adjust the target size of splitting for the > // SliceTask from say (size / k) to say min(size / k, 1 << 14) > // This will limit the size of the buffers created at the leaf nodes > // cancellation will be more aggressive cancelling later tasks > // if the target slice size has been reached from a given task, > // cancellation should also clear local results if any > > I checked with the following code: > > for(int n : new int[] {10, 100, 1000, 5000, 10000, 50000, 100000, 500000, > 1000000}) { > System.out.println(n); > long[] arr = LongStream.range(0, Long.MAX_VALUE).filter(i -> > true).parallel().limit(n).toArray(); > long[] ref = LongStream.range(0, n).toArray(); > System.out.println(Arrays.equals(arr, ref)); > } > > It works correctly after applying my patch (while dies with OOME > without patch, as comment suggests). > > Currently existing unit tests also pass with my patch. However I'm > very new in the internals of parallel stream processing, so it's > possible that I'm missing something. Please review! If this looks > reasonable I will log an issue and write new test cases. > > Thank you in advance, > With best regards, > Tagir Valeev. >
Hi Tagir, I'm not an expert. To me, your suggestion appears to be very sensible. Regards, Stefan