Github user justinleet commented on a diff in the pull request: https://github.com/apache/incubator-metron/pull/463#discussion_r102776136 --- Diff: metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java --- @@ -97,88 +110,73 @@ public void testSequentialStreamLargeBatch() throws FileNotFoundException { Map<String, Integer> count = stream.map(s -> s.trim()) .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); - Assert.assertEquals(5, count.size()); - Assert.assertEquals(3, (int) count.get("foo")); - Assert.assertEquals(2, (int) count.get("bar")); - Assert.assertEquals(1, (int) count.get("and")); - Assert.assertEquals(1, (int) count.get("the")); + validateMapCount(count); } } - @Test - public void testActuallyParallel() throws ExecutionException, InterruptedException, FileNotFoundException { - //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most min(5, 2) = 2 threads will be used - try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2)) { - ForkJoinPool forkJoinPool = new ForkJoinPool(2); - forkJoinPool.submit(() -> { - Map<String, Integer> threads = - stream.parallel().map(s -> Thread.currentThread().getName()) - .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); - Assert.assertTrue(threads.size() <= 2); - } - ).get(); - } - } + private int getNumberOfBatches(final ReaderSpliterator spliterator) throws ExecutionException, InterruptedException { + final AtomicInteger numSplits = new AtomicInteger(0); + //we want to wrap the spliterator and count the (valid) splits + Spliterator<String> delegatingSpliterator = new Spliterator<String>() { + @Override + public boolean tryAdvance(Consumer<? super String> action) { + return spliterator.tryAdvance(action); + } - @Test - public void testActuallyParallel_mediumBatch() throws ExecutionException, InterruptedException, FileNotFoundException { - //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used - try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2)) { - ForkJoinPool forkJoinPool = new ForkJoinPool(10); - forkJoinPool.submit(() -> { - Map<String, Integer> threads = - stream.parallel().map(s -> Thread.currentThread().getName()) - .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); - Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1); - } - ).get(); - } + @Override + public Spliterator<String> trySplit() { + Spliterator<String> ret = spliterator.trySplit(); + if(ret != null) { + numSplits.incrementAndGet(); + } + return ret; + } + + @Override + public long estimateSize() { + return spliterator.estimateSize(); + } + + @Override + public int characteristics() { + return spliterator.characteristics(); + } + }; + + Stream<String> stream = StreamSupport.stream(delegatingSpliterator, true); + + //now run it in a parallel pool and do some calculation that doesn't really matter. + ForkJoinPool forkJoinPool = new ForkJoinPool(10); --- End diff -- Incredibly minor point, but since we no longer care about the actual execution and aren't running it a lot, it seems appropriate to just use ForkJoinPool.commonPool(), and drop the shutdown line. This is entirely up to you if you want to change, I don't consider it blocking by any means.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---