This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e426d370ea Start with solo accumulator and empty partition (#14426)
e426d370ea is described below

commit e426d370ea974546fdb7fe6f43e44fba015181d7
Author: Pranav <pranavbh...@gmail.com>
AuthorDate: Wed Jun 14 16:20:48 2023 -0700

    Start with solo accumulator and empty partition (#14426)
    
    * Starting parallel merge with solo accumulator and empty partitions
    
    * shutshown pool in test
---
 .../guava/ParallelMergeCombiningSequence.java      |  4 ++
 .../guava/ParallelMergeCombiningSequenceTest.java  | 78 +++++++++++++++++++++-
 2 files changed, 81 insertions(+), 1 deletion(-)

diff --git 
a/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java
 
b/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java
index 9c39c29d06..1f09fc9163 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java
@@ -127,6 +127,10 @@ public class ParallelMergeCombiningSequence<T> extends 
YieldingSequenceBase<T>
     // less chance of blocking the merge
     final BlockingQueue<ResultBatch<T>> outputQueue = new 
ArrayBlockingQueue<>(4 * queueSize);
     final MergeCombineMetricsAccumulator metricsAccumulator = new 
MergeCombineMetricsAccumulator(inputSequences.size());
+    // Starting with empty partitionMetrics
+    metricsAccumulator.setPartitions(Collections.emptyList());
+    // starting with solo merge accumulator
+    metricsAccumulator.setMergeMetrics(new 
MergeCombineActionMetricsAccumulator());
     MergeCombinePartitioningAction<T> mergeCombineAction = new 
MergeCombinePartitioningAction<>(
         inputSequences,
         orderingFn,
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java
index 057ffcf0fb..49bd361402 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java
@@ -109,7 +109,7 @@ public class ParallelMergeCombiningSequenceTest
   }
 
   @Test
-  public void testOrderedResultBatchFromSequenceBackToYielderOnSequence() 
throws IOException
+  public void testOrderedResultBatchFromSequenceBacktoYielderOnSequence() 
throws IOException
   {
     final int batchSize = 128;
     final int sequenceSize = 5_000;
@@ -285,6 +285,27 @@ public class ParallelMergeCombiningSequenceTest
     input.add(nonBlockingSequence(5));
     assertResult(input);
   }
+  @Test
+  public void testMergeCombineMetricsAccumulatorNPEOnBadExecutorPool() throws 
Exception
+  {
+    // below min threshold, so will merge serially
+    List<Sequence<IntPair>> input = new ArrayList<>();
+    input.add(nonBlockingSequence(5));
+    input.add(nonBlockingSequence(6));
+    // Simulates the bad/occupied executor pool, it does not execute any task 
submitted to it
+    ForkJoinPool customBadPool = new ForkJoinPool(
+        1,
+        pool -> null,
+        (t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t),
+        true
+    );
+    expectedException.expect(QueryTimeoutException.class);
+    expectedException.expectMessage(
+        "Query did not complete within configured timeout period"
+    );
+    assertResultWithCustomPool(input, 10, 20, reportMetrics -> {}, 
customBadPool);
+    customBadPool.shutdown();
+  }
 
   @Test
   public void testAllInSingleBatch() throws Exception
@@ -586,6 +607,61 @@ public class ParallelMergeCombiningSequenceTest
     );
   }
 
+  private void assertResultWithCustomPool(
+          List<Sequence<IntPair>> sequences,
+          int batchSize,
+          int yieldAfter,
+          Consumer<ParallelMergeCombiningSequence.MergeCombineMetrics> 
reporter,
+          ForkJoinPool customPool
+  )
+          throws InterruptedException, IOException
+  {
+    final CombiningSequence<IntPair> combiningSequence = 
CombiningSequence.create(
+            new MergeSequence<>(INT_PAIR_ORDERING, 
Sequences.simple(sequences)),
+            INT_PAIR_ORDERING,
+            INT_PAIR_MERGE_FN
+    );
+
+    final ParallelMergeCombiningSequence<IntPair> parallelMergeCombineSequence 
= new ParallelMergeCombiningSequence<>(
+            customPool,
+            sequences,
+            INT_PAIR_ORDERING,
+            INT_PAIR_MERGE_FN,
+            true,
+            5000,
+            0,
+            TEST_POOL_SIZE,
+            yieldAfter,
+            batchSize,
+            ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS,
+            reporter
+    );
+
+    Yielder<IntPair> combiningYielder = Yielders.each(combiningSequence);
+    Yielder<IntPair> parallelMergeCombineYielder = 
Yielders.each(parallelMergeCombineSequence);
+
+    IntPair prev = null;
+
+    while (!combiningYielder.isDone() && 
!parallelMergeCombineYielder.isDone()) {
+      Assert.assertEquals(combiningYielder.get(), 
parallelMergeCombineYielder.get());
+      Assert.assertNotEquals(parallelMergeCombineYielder.get(), prev);
+      prev = parallelMergeCombineYielder.get();
+      combiningYielder = combiningYielder.next(combiningYielder.get());
+      parallelMergeCombineYielder = 
parallelMergeCombineYielder.next(parallelMergeCombineYielder.get());
+    }
+
+    Assert.assertTrue(combiningYielder.isDone());
+    Assert.assertTrue(parallelMergeCombineYielder.isDone());
+    while (pool.getRunningThreadCount() > 0) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(0, pool.getRunningThreadCount());
+    combiningYielder.close();
+    parallelMergeCombineYielder.close();
+    // cancellation trigger should not be set if sequence was fully yielded 
and close is called
+    // (though shouldn't actually matter even if it was...)
+    
Assert.assertFalse(parallelMergeCombineSequence.getCancellationGizmo().isCancelled());
+  }
   private void assertResult(
       List<Sequence<IntPair>> sequences,
       int batchSize,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to