Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/463#discussion_r102776136
  
    --- Diff: 
metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
 ---
    @@ -97,88 +110,73 @@ public void testSequentialStreamLargeBatch() throws 
FileNotFoundException {
           Map<String, Integer> count =
                   stream.map(s -> s.trim())
                           .collect(Collectors.toMap(s -> s, s -> 1, 
Integer::sum));
    -      Assert.assertEquals(5, count.size());
    -      Assert.assertEquals(3, (int) count.get("foo"));
    -      Assert.assertEquals(2, (int) count.get("bar"));
    -      Assert.assertEquals(1, (int) count.get("and"));
    -      Assert.assertEquals(1, (int) count.get("the"));
    +      validateMapCount(count);
         }
       }
     
    -  @Test
    -  public void testActuallyParallel() throws ExecutionException, 
InterruptedException, FileNotFoundException {
    -    //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 
batches, so at most min(5, 2) = 2 threads will be used
    -    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 
2)) {
    -      ForkJoinPool forkJoinPool = new ForkJoinPool(2);
    -      forkJoinPool.submit(() -> {
    -                Map<String, Integer> threads =
    -                        stream.parallel().map(s -> 
Thread.currentThread().getName())
    -                                .collect(Collectors.toMap(s -> s, s -> 1, 
Integer::sum));
    -                Assert.assertTrue(threads.size() <= 2);
    -              }
    -      ).get();
    -    }
    -  }
    +  private int getNumberOfBatches(final ReaderSpliterator spliterator) 
throws ExecutionException, InterruptedException {
    +    final AtomicInteger numSplits = new AtomicInteger(0);
    +    //we want to wrap the spliterator and count the (valid) splits
    +    Spliterator<String> delegatingSpliterator = new Spliterator<String>() {
    +      @Override
    +      public boolean tryAdvance(Consumer<? super String> action) {
    +        return spliterator.tryAdvance(action);
    +      }
     
    -  @Test
    -  public void testActuallyParallel_mediumBatch() throws 
ExecutionException, InterruptedException, FileNotFoundException {
    -    //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 
batches, so at most 5 threads of the pool of 10 will be used
    -    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 
2)) {
    -      ForkJoinPool forkJoinPool = new ForkJoinPool(10);
    -      forkJoinPool.submit(() -> {
    -                Map<String, Integer> threads =
    -                        stream.parallel().map(s -> 
Thread.currentThread().getName())
    -                                .collect(Collectors.toMap(s -> s, s -> 1, 
Integer::sum));
    -                Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 
2) && threads.size() > 1);
    -              }
    -      ).get();
    -    }
    +      @Override
    +      public Spliterator<String> trySplit() {
    +        Spliterator<String> ret = spliterator.trySplit();
    +        if(ret != null) {
    +          numSplits.incrementAndGet();
    +        }
    +        return ret;
    +      }
    +
    +      @Override
    +      public long estimateSize() {
    +        return spliterator.estimateSize();
    +      }
    +
    +      @Override
    +      public int characteristics() {
    +        return spliterator.characteristics();
    +      }
    +    };
    +
    +    Stream<String> stream = StreamSupport.stream(delegatingSpliterator, 
true);
    +
    +    //now run it in a parallel pool and do some calculation that doesn't 
really matter.
    +    ForkJoinPool forkJoinPool = new ForkJoinPool(10);
    --- End diff --
    
    Incredibly minor point, but since we no longer care about the actual 
execution and aren't running it a lot, it seems appropriate to just use 
ForkJoinPool.commonPool(), and drop the shutdown line.
    
    This is entirely up to you if you want to change, I don't consider it 
blocking by any means.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to