This is an automated email from the ASF dual-hosted git repository. cwylie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new e426d370ea Start with solo accumulator and empty partition (#14426) e426d370ea is described below commit e426d370ea974546fdb7fe6f43e44fba015181d7 Author: Pranav <pranavbh...@gmail.com> AuthorDate: Wed Jun 14 16:20:48 2023 -0700 Start with solo accumulator and empty partition (#14426) * Starting parallel merge with solo accumulator and empty partitions * shutshown pool in test --- .../guava/ParallelMergeCombiningSequence.java | 4 ++ .../guava/ParallelMergeCombiningSequenceTest.java | 78 +++++++++++++++++++++- 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 9c39c29d06..1f09fc9163 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -127,6 +127,10 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T> // less chance of blocking the merge final BlockingQueue<ResultBatch<T>> outputQueue = new ArrayBlockingQueue<>(4 * queueSize); final MergeCombineMetricsAccumulator metricsAccumulator = new MergeCombineMetricsAccumulator(inputSequences.size()); + // Starting with empty partitionMetrics + metricsAccumulator.setPartitions(Collections.emptyList()); + // starting with solo merge accumulator + metricsAccumulator.setMergeMetrics(new MergeCombineActionMetricsAccumulator()); MergeCombinePartitioningAction<T> mergeCombineAction = new MergeCombinePartitioningAction<>( inputSequences, orderingFn, diff --git a/processing/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java b/processing/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java index 057ffcf0fb..49bd361402 100644 --- a/processing/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java @@ -109,7 +109,7 @@ public class ParallelMergeCombiningSequenceTest } @Test - public void testOrderedResultBatchFromSequenceBackToYielderOnSequence() throws IOException + public void testOrderedResultBatchFromSequenceBacktoYielderOnSequence() throws IOException { final int batchSize = 128; final int sequenceSize = 5_000; @@ -285,6 +285,27 @@ public class ParallelMergeCombiningSequenceTest input.add(nonBlockingSequence(5)); assertResult(input); } + @Test + public void testMergeCombineMetricsAccumulatorNPEOnBadExecutorPool() throws Exception + { + // below min threshold, so will merge serially + List<Sequence<IntPair>> input = new ArrayList<>(); + input.add(nonBlockingSequence(5)); + input.add(nonBlockingSequence(6)); + // Simulates the bad/occupied executor pool, it does not execute any task submitted to it + ForkJoinPool customBadPool = new ForkJoinPool( + 1, + pool -> null, + (t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t), + true + ); + expectedException.expect(QueryTimeoutException.class); + expectedException.expectMessage( + "Query did not complete within configured timeout period" + ); + assertResultWithCustomPool(input, 10, 20, reportMetrics -> {}, customBadPool); + customBadPool.shutdown(); + } @Test public void testAllInSingleBatch() throws Exception @@ -586,6 +607,61 @@ public class ParallelMergeCombiningSequenceTest ); } + private void assertResultWithCustomPool( + List<Sequence<IntPair>> sequences, + int batchSize, + int yieldAfter, + Consumer<ParallelMergeCombiningSequence.MergeCombineMetrics> reporter, + ForkJoinPool customPool + ) + throws InterruptedException, IOException + { + final CombiningSequence<IntPair> combiningSequence = CombiningSequence.create( + new MergeSequence<>(INT_PAIR_ORDERING, Sequences.simple(sequences)), + INT_PAIR_ORDERING, + INT_PAIR_MERGE_FN + ); + + final ParallelMergeCombiningSequence<IntPair> parallelMergeCombineSequence = new ParallelMergeCombiningSequence<>( + customPool, + sequences, + INT_PAIR_ORDERING, + INT_PAIR_MERGE_FN, + true, + 5000, + 0, + TEST_POOL_SIZE, + yieldAfter, + batchSize, + ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS, + reporter + ); + + Yielder<IntPair> combiningYielder = Yielders.each(combiningSequence); + Yielder<IntPair> parallelMergeCombineYielder = Yielders.each(parallelMergeCombineSequence); + + IntPair prev = null; + + while (!combiningYielder.isDone() && !parallelMergeCombineYielder.isDone()) { + Assert.assertEquals(combiningYielder.get(), parallelMergeCombineYielder.get()); + Assert.assertNotEquals(parallelMergeCombineYielder.get(), prev); + prev = parallelMergeCombineYielder.get(); + combiningYielder = combiningYielder.next(combiningYielder.get()); + parallelMergeCombineYielder = parallelMergeCombineYielder.next(parallelMergeCombineYielder.get()); + } + + Assert.assertTrue(combiningYielder.isDone()); + Assert.assertTrue(parallelMergeCombineYielder.isDone()); + while (pool.getRunningThreadCount() > 0) { + Thread.sleep(100); + } + Assert.assertEquals(0, pool.getRunningThreadCount()); + combiningYielder.close(); + parallelMergeCombineYielder.close(); + // cancellation trigger should not be set if sequence was fully yielded and close is called + // (though shouldn't actually matter even if it was...) + Assert.assertFalse(parallelMergeCombineSequence.getCancellationGizmo().isCancelled()); + } private void assertResult( List<Sequence<IntPair>> sequences, int batchSize, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org