This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 7ea55e9  [FLINK-12595][kinesis] Interrupt thread at right time to 
avoid deadlock
7ea55e9 is described below

commit 7ea55e967bc450b3b744edcaea23834646e439cd
Author: Shannon Carey <rehevk...@gmail.com>
AuthorDate: Sat Jul 20 14:15:50 2019 -0500

    [FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock
    
    - Inside testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown,
    consumerThread.interrupt() was getting absorbed inside
    KinesisDataFetcher's while(running) loop, therefore
    TestableKinesisDataFetcherForShardConsumerException's awaitTermination()
    wasn't getting interrupted by it. This led to deadlock, with
    KinesisDataFetcher waiting on the test code to send the interrupt, and
    the test code waiting for KinesisDataFetcher to throw the expected
    exception.
    - Now, the test code waits until KinesisDataFetcher is inside
    awaitTermination() before producing the interrupt, so it can be sure
    that the interrupt it produces will be received/handled inside
    awaitTermination().
---
 .../kinesis/internals/KinesisDataFetcherTest.java         |  6 +++++-
 ...stableKinesisDataFetcherForShardConsumerException.java | 15 ++++++++++++++-
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 5255e61..2815193 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -846,7 +846,7 @@ public class KinesisDataFetcherTest extends TestLogger {
                DummyFlinkKinesisConsumer<String> consumer = new 
DummyFlinkKinesisConsumer<>(
                        TestUtils.getStandardProperties(), fetcher, 1, 0);
 
-               CheckedThread consumerThread = new CheckedThread() {
+               CheckedThread consumerThread = new 
CheckedThread("FlinkKinesisConsumer") {
                        @Override
                        public void go() throws Exception {
                                consumer.run(new TestSourceContext<>());
@@ -858,6 +858,10 @@ public class KinesisDataFetcherTest extends TestLogger {
                // ShardConsumer exception (from deserializer) will result in 
fetcher being shut down.
                fetcher.waitUntilShutdown(20, TimeUnit.SECONDS);
 
+               // Ensure that KinesisDataFetcher has exited its while(running) 
loop and is inside its awaitTermination()
+               // method before we interrupt its thread, so that our interrupt 
doesn't get absorbed by any other mechanism.
+               fetcher.waitUntilAwaitTermination(20, TimeUnit.SECONDS);
+
                // Interrupt the thread so that 
KinesisDataFetcher#awaitTermination() will throw InterruptedException.
                consumerThread.interrupt();
 
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
index c08b7af..6ae4391 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -32,6 +33,8 @@ import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -39,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * {@link #awaitTermination()}.
  */
 public class TestableKinesisDataFetcherForShardConsumerException<T> extends 
TestableKinesisDataFetcher<T> {
+       public volatile boolean wasInterrupted = false;
+
+       private OneShotLatch awaitTerminationWaiter = new OneShotLatch();
+
        public TestableKinesisDataFetcherForShardConsumerException(final 
List<String> fakeStreams,
                        final SourceFunction.SourceContext<T> sourceContext,
                        final Properties fakeConfiguration,
@@ -54,7 +61,12 @@ public class 
TestableKinesisDataFetcherForShardConsumerException<T> extends Test
                        
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, fakeKinesis);
        }
 
-       public volatile boolean wasInterrupted = false;
+       /**
+        * Block until awaitTermination() has been called on this class.
+        */
+       public void waitUntilAwaitTermination(long timeout, TimeUnit timeUnit) 
throws InterruptedException, TimeoutException {
+               awaitTerminationWaiter.await(timeout, timeUnit);
+       }
 
        @Override
        protected ExecutorService createShardConsumersThreadPool(final String 
subtaskName) {
@@ -65,6 +77,7 @@ public class 
TestableKinesisDataFetcherForShardConsumerException<T> extends Test
 
        @Override
        public void awaitTermination() throws InterruptedException {
+               awaitTerminationWaiter.trigger();
                try {
                        // Force this method to only exit by thread getting 
interrupted.
                        while (true) {

Reply via email to