This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a617aeedf9a3db5bee03b18bf0147a9613a9268c
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Thu Dec 5 16:23:17 2019 +0100

    [hotfix][test] Rename and reformat testCancellationFailsWithBlockingLock
    
    New name testCanceleablesCanceledOnCancelTaskError better reflects what the 
test is doing
    while CancelFailingTask closer to the test case makes reading the test 
easier.
---
 .../streaming/runtime/tasks/StreamTaskTest.java    | 188 ++++++++++-----------
 1 file changed, 94 insertions(+), 94 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 7719259..9917301 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -315,7 +315,7 @@ public class StreamTaskTest extends TestLogger {
        }
 
        @Test
-       public void testCancellationFailsWithBlockingLock() throws Exception {
+       public void testCanceleablesCanceledOnCancelTaskError() throws 
Exception {
                syncLatch = new OneShotLatch();
 
                StreamConfig cfg = new StreamConfig(new Configuration());
@@ -334,6 +334,99 @@ public class StreamTaskTest extends TestLogger {
                assertEquals(ExecutionState.CANCELED, task.getExecutionState());
        }
 
+       /**
+        * A task that locks for ever, fail in {@link #cancelTask()}. It can be 
only shut down cleanly
+        * if {@link StreamTask#getCancelables()} are closed properly.
+        */
+       public static class CancelFailingTask extends StreamTask<String, 
AbstractStreamOperator<String>> {
+
+               public CancelFailingTask(Environment env) {
+                       super(env);
+               }
+
+               @Override
+               protected void init() {}
+
+               @Override
+               protected void processInput(MailboxDefaultAction.Controller 
controller) throws Exception {
+                       final OneShotLatch latch = new OneShotLatch();
+                       final Object lock = new Object();
+
+                       LockHolder holder = new LockHolder(lock, latch);
+                       holder.start();
+                       try {
+                               // cancellation should try and cancel this
+                               getCancelables().registerCloseable(holder);
+
+                               // wait till the lock holder has the lock
+                               latch.await();
+
+                               // we are at the point where cancelling can 
happen
+                               syncLatch.trigger();
+
+                               // try to acquire the lock - this is not 
possible as long as the lock holder
+                               // thread lives
+                               //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                               synchronized (lock) {
+                                       // nothing
+                               }
+                       }
+                       finally {
+                               holder.close();
+                       }
+                       controller.allActionsCompleted();
+               }
+
+               @Override
+               protected void cleanup() {}
+
+               @Override
+               protected void cancelTask() throws Exception {
+                       throw new Exception("test exception");
+               }
+
+               /**
+                * A thread that holds a lock as long as it lives.
+                */
+               private static final class LockHolder extends Thread implements 
Closeable {
+
+                       private final OneShotLatch trigger;
+                       private final Object lock;
+                       private volatile boolean canceled;
+
+                       private LockHolder(Object lock, OneShotLatch trigger) {
+                               this.lock = lock;
+                               this.trigger = trigger;
+                       }
+
+                       @Override
+                       public void run() {
+                               synchronized (lock) {
+                                       while (!canceled) {
+                                               // signal that we grabbed the 
lock
+                                               trigger.trigger();
+
+                                               // basically freeze this thread
+                                               try {
+                                                       //noinspection 
SleepWhileHoldingLock
+                                                       
Thread.sleep(1000000000);
+                                               } catch (InterruptedException 
ignored) {}
+                                       }
+                               }
+                       }
+
+                       public void cancel() {
+                               canceled = true;
+                       }
+
+                       @Override
+                       public void close() {
+                               canceled = true;
+                               interrupt();
+                       }
+               }
+       }
+
        @Test
        public void testDecliningCheckpointStreamOperator() throws Exception {
                DeclineDummyEnvironment declineDummyEnvironment = new 
DeclineDummyEnvironment();
@@ -1360,58 +1453,6 @@ public class StreamTaskTest extends TestLogger {
                }
        }
 
-       /**
-        * A task that locks if cancellation attempts to cleanly shut down.
-        */
-       public static class CancelFailingTask extends StreamTask<String, 
AbstractStreamOperator<String>> {
-
-               public CancelFailingTask(Environment env) {
-                       super(env);
-               }
-
-               @Override
-               protected void init() {}
-
-               @Override
-               protected void processInput(MailboxDefaultAction.Controller 
controller) throws Exception {
-                       final OneShotLatch latch = new OneShotLatch();
-                       final Object lock = new Object();
-
-                       LockHolder holder = new LockHolder(lock, latch);
-                       holder.start();
-                       try {
-                               // cancellation should try and cancel this
-                               getCancelables().registerCloseable(holder);
-
-                               // wait till the lock holder has the lock
-                               latch.await();
-
-                               // we are at the point where cancelling can 
happen
-                               syncLatch.trigger();
-
-                               // try to acquire the lock - this is not 
possible as long as the lock holder
-                               // thread lives
-                               //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
-                               synchronized (lock) {
-                                       // nothing
-                               }
-                       }
-                       finally {
-                               holder.close();
-                       }
-                       controller.allActionsCompleted();
-               }
-
-               @Override
-               protected void cleanup() {}
-
-               @Override
-               protected void cancelTask() throws Exception {
-                       throw new Exception("test exception");
-               }
-
-       }
-
        private static class ThreadInspectingTask extends StreamTask<String, 
AbstractStreamOperator<String>> {
 
                private final long taskThreadId;
@@ -1482,47 +1523,6 @@ public class StreamTaskTest extends TestLogger {
        // 
------------------------------------------------------------------------
        // 
------------------------------------------------------------------------
 
-       /**
-        * A thread that holds a lock as long as it lives.
-        */
-       private static final class LockHolder extends Thread implements 
Closeable {
-
-               private final OneShotLatch trigger;
-               private final Object lock;
-               private volatile boolean canceled;
-
-               private LockHolder(Object lock, OneShotLatch trigger) {
-                       this.lock = lock;
-                       this.trigger = trigger;
-               }
-
-               @Override
-               public void run() {
-                       synchronized (lock) {
-                               while (!canceled) {
-                                       // signal that we grabbed the lock
-                                       trigger.trigger();
-
-                                       // basically freeze this thread
-                                       try {
-                                               //noinspection 
SleepWhileHoldingLock
-                                               Thread.sleep(1000000000);
-                                       } catch (InterruptedException ignored) 
{}
-                               }
-                       }
-               }
-
-               public void cancel() {
-                       canceled = true;
-               }
-
-               @Override
-               public void close() {
-                       canceled = true;
-                       interrupt();
-               }
-       }
-
        static class TestStreamSource<OUT, SRC extends SourceFunction<OUT>> 
extends StreamSource<OUT, SRC> {
 
                static AbstractKeyedStateBackend<?> keyedStateBackend;

Reply via email to