alexeykudinkin commented on a change in pull request #4264:
URL: https://github.com/apache/hudi/pull/4264#discussion_r826327586



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
##########
@@ -152,6 +165,144 @@ protected Integer getResult() {
     } finally {
       if (executor != null) {
         executor.shutdownNow();
+        executor.awaitTermination();
+      }
+    }
+  }
+
+  @Test
+  public void testExecutorTermination() throws ExecutionException, 
InterruptedException {

Review comment:
       Appreciate your effort in putting up this test, but i usually suggest to 
avoid checking in non-deterministic tests -- non-deterministic tests is paving 
the way for flakiness of them given that we don't control tests execution 
environment (in CI and where not)
   
   Let's instead either try to rewrite it as deterministic test (both positive, 
negative) by controlling the execution with `CountDownLatch`, `CyclicBarrier` 
OR keep just the positive case (which has to pass and should not fail) 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
##########
@@ -168,6 +169,35 @@ public boolean isRemaining() {
   public void shutdownNow() {
     producerExecutorService.shutdownNow();
     consumerExecutorService.shutdownNow();
+    // close queue to force producer stop
+    queue.close();
+  }
+
+  public boolean awaitTermination() {
+    boolean interruptedBefore = Thread.currentThread().isInterrupted();
+    boolean producerTerminated = false;
+    boolean consumerTerminated = false;
+    try {
+      producerTerminated = 
producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
+      consumerTerminated = 
consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
+      return producerTerminated && consumerTerminated;
+    } catch (InterruptedException ie) {
+      if (!interruptedBefore) {
+        Thread.currentThread().interrupt();

Review comment:
       What do we call interrupt here for?

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
##########
@@ -152,6 +165,144 @@ protected Integer getResult() {
     } finally {
       if (executor != null) {
         executor.shutdownNow();
+        executor.awaitTermination();
+      }
+    }
+  }
+
+  @Test
+  public void testExecutorTermination() throws ExecutionException, 
InterruptedException {
+    // HUDI-2875: sleep time in this UT is designed deliberately. It 
represents the case that
+    // consumer is slower than producer and the queue connecting them is 
non-empty.
+    // firstly test a nonSafe usage
+    ExecutorService executionThread = Executors.newSingleThreadExecutor();
+    Future<Boolean> testResult = executionThread.submit(new 
ExecutorConcurrentUsageTask(false));
+    // let executor run some time
+    sleepUninterruptibly(2 * 1000);
+    executionThread.shutdownNow();
+    boolean concurrentSafe = !testResult.get();
+    assertFalse(concurrentSafe, "Should find concurrent issue");
+    // test a thread safe usage
+    executionThread = Executors.newSingleThreadExecutor();
+    testResult = executionThread.submit(new ExecutorConcurrentUsageTask(true));
+    sleepUninterruptibly(2 * 1000);
+    executionThread.shutdownNow();
+    concurrentSafe = !testResult.get();
+    assertTrue(concurrentSafe, "Should not find concurrent issue");
+  }
+
+  private static void sleepUninterruptibly(int milliseconds) {
+    long remainingNanos = TimeUnit.MILLISECONDS.toNanos(milliseconds);
+    long end = System.nanoTime() + remainingNanos;
+    while (true) {
+      try {
+        TimeUnit.NANOSECONDS.sleep(remainingNanos);
+        return;
+      } catch (InterruptedException interruptedException) {
+        remainingNanos = end - System.nanoTime();
+      }
+    }
+  }
+
+  private class ExecutorConcurrentUsageTask implements Callable<Boolean> {
+    private final boolean correct;
+
+    private ExecutorConcurrentUsageTask(boolean correct) {
+      this.correct = correct;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+      when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024 * 
1024);
+
+      Iterator<GenericRecord> unboundedRecordIter = new 
Iterator<GenericRecord>() {
+        private final Random random = new Random();

Review comment:
       Let's actually abstract all test data generation w/in 
HoodieTestDataGenerator

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
##########
@@ -168,6 +169,35 @@ public boolean isRemaining() {
   public void shutdownNow() {
     producerExecutorService.shutdownNow();
     consumerExecutorService.shutdownNow();
+    // close queue to force producer stop
+    queue.close();
+  }
+
+  public boolean awaitTermination() {
+    boolean interruptedBefore = Thread.currentThread().isInterrupted();
+    boolean producerTerminated = false;
+    boolean consumerTerminated = false;
+    try {
+      producerTerminated = 
producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
+      consumerTerminated = 
consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
+      return producerTerminated && consumerTerminated;
+    } catch (InterruptedException ie) {
+      if (!interruptedBefore) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+      // if current thread has been interrupted before awaitTermination was 
called.
+      // We still give executorService a chance to wait termination as
+      // what is wanted to be interrupted may not be the waiting process.
+      try {
+        producerTerminated = 
producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
+        consumerTerminated = 
consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
+      } catch (InterruptedException expected) {
+        // awaiting process is interrupted again.
+      }
+      Thread.currentThread().interrupt();

Review comment:
       Here as well?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
##########
@@ -168,6 +169,35 @@ public boolean isRemaining() {
   public void shutdownNow() {
     producerExecutorService.shutdownNow();
     consumerExecutorService.shutdownNow();
+    // close queue to force producer stop
+    queue.close();
+  }
+
+  public boolean awaitTermination() {
+    boolean interruptedBefore = Thread.currentThread().isInterrupted();
+    boolean producerTerminated = false;
+    boolean consumerTerminated = false;
+    try {
+      producerTerminated = 
producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
+      consumerTerminated = 
consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
+      return producerTerminated && consumerTerminated;
+    } catch (InterruptedException ie) {
+      if (!interruptedBefore) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+      // if current thread has been interrupted before awaitTermination was 
called.

Review comment:
       If i understood your intention correctly, you want to give a chance to 
executors to shutdown properly before proceeding, right? 
   
   You don't need to duplicate code for this you can just call `interrupted` 
instead of `isInterrupted` and that would clear the interrupted state flag, 
making sure that following awaitTermination invocations won't fail (line 181, 
182)

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
##########
@@ -152,6 +165,144 @@ protected Integer getResult() {
     } finally {
       if (executor != null) {
         executor.shutdownNow();
+        executor.awaitTermination();
+      }
+    }
+  }
+
+  @Test
+  public void testExecutorTermination() throws ExecutionException, 
InterruptedException {
+    // HUDI-2875: sleep time in this UT is designed deliberately. It 
represents the case that
+    // consumer is slower than producer and the queue connecting them is 
non-empty.
+    // firstly test a nonSafe usage
+    ExecutorService executionThread = Executors.newSingleThreadExecutor();
+    Future<Boolean> testResult = executionThread.submit(new 
ExecutorConcurrentUsageTask(false));
+    // let executor run some time
+    sleepUninterruptibly(2 * 1000);
+    executionThread.shutdownNow();
+    boolean concurrentSafe = !testResult.get();
+    assertFalse(concurrentSafe, "Should find concurrent issue");
+    // test a thread safe usage
+    executionThread = Executors.newSingleThreadExecutor();
+    testResult = executionThread.submit(new ExecutorConcurrentUsageTask(true));
+    sleepUninterruptibly(2 * 1000);
+    executionThread.shutdownNow();
+    concurrentSafe = !testResult.get();
+    assertTrue(concurrentSafe, "Should not find concurrent issue");
+  }
+
+  private static void sleepUninterruptibly(int milliseconds) {
+    long remainingNanos = TimeUnit.MILLISECONDS.toNanos(milliseconds);
+    long end = System.nanoTime() + remainingNanos;
+    while (true) {
+      try {
+        TimeUnit.NANOSECONDS.sleep(remainingNanos);
+        return;
+      } catch (InterruptedException interruptedException) {
+        remainingNanos = end - System.nanoTime();
+      }
+    }
+  }
+
+  private class ExecutorConcurrentUsageTask implements Callable<Boolean> {
+    private final boolean correct;
+
+    private ExecutorConcurrentUsageTask(boolean correct) {
+      this.correct = correct;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+      when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024 * 
1024);
+
+      Iterator<GenericRecord> unboundedRecordIter = new 
Iterator<GenericRecord>() {
+        private final Random random = new Random();

Review comment:
       Let's make test reproducible: 
   
   1. Please add static seed for Random
   2. Don't use `UUID.randomUUID` (you can take a look how UUIDs are 
pseudo-randomly generated in `HoodieTestDataGenerator`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to