This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a617aeedf9a3db5bee03b18bf0147a9613a9268c Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Thu Dec 5 16:23:17 2019 +0100 [hotfix][test] Rename and reformat testCancellationFailsWithBlockingLock New name testCanceleablesCanceledOnCancelTaskError better reflects what the test is doing while CancelFailingTask closer to the test case makes reading the test easier. --- .../streaming/runtime/tasks/StreamTaskTest.java | 188 ++++++++++----------- 1 file changed, 94 insertions(+), 94 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 7719259..9917301 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -315,7 +315,7 @@ public class StreamTaskTest extends TestLogger { } @Test - public void testCancellationFailsWithBlockingLock() throws Exception { + public void testCanceleablesCanceledOnCancelTaskError() throws Exception { syncLatch = new OneShotLatch(); StreamConfig cfg = new StreamConfig(new Configuration()); @@ -334,6 +334,99 @@ public class StreamTaskTest extends TestLogger { assertEquals(ExecutionState.CANCELED, task.getExecutionState()); } + /** + * A task that locks for ever, fail in {@link #cancelTask()}. It can be only shut down cleanly + * if {@link StreamTask#getCancelables()} are closed properly. + */ + public static class CancelFailingTask extends StreamTask<String, AbstractStreamOperator<String>> { + + public CancelFailingTask(Environment env) { + super(env); + } + + @Override + protected void init() {} + + @Override + protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { + final OneShotLatch latch = new OneShotLatch(); + final Object lock = new Object(); + + LockHolder holder = new LockHolder(lock, latch); + holder.start(); + try { + // cancellation should try and cancel this + getCancelables().registerCloseable(holder); + + // wait till the lock holder has the lock + latch.await(); + + // we are at the point where cancelling can happen + syncLatch.trigger(); + + // try to acquire the lock - this is not possible as long as the lock holder + // thread lives + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (lock) { + // nothing + } + } + finally { + holder.close(); + } + controller.allActionsCompleted(); + } + + @Override + protected void cleanup() {} + + @Override + protected void cancelTask() throws Exception { + throw new Exception("test exception"); + } + + /** + * A thread that holds a lock as long as it lives. + */ + private static final class LockHolder extends Thread implements Closeable { + + private final OneShotLatch trigger; + private final Object lock; + private volatile boolean canceled; + + private LockHolder(Object lock, OneShotLatch trigger) { + this.lock = lock; + this.trigger = trigger; + } + + @Override + public void run() { + synchronized (lock) { + while (!canceled) { + // signal that we grabbed the lock + trigger.trigger(); + + // basically freeze this thread + try { + //noinspection SleepWhileHoldingLock + Thread.sleep(1000000000); + } catch (InterruptedException ignored) {} + } + } + } + + public void cancel() { + canceled = true; + } + + @Override + public void close() { + canceled = true; + interrupt(); + } + } + } + @Test public void testDecliningCheckpointStreamOperator() throws Exception { DeclineDummyEnvironment declineDummyEnvironment = new DeclineDummyEnvironment(); @@ -1360,58 +1453,6 @@ public class StreamTaskTest extends TestLogger { } } - /** - * A task that locks if cancellation attempts to cleanly shut down. - */ - public static class CancelFailingTask extends StreamTask<String, AbstractStreamOperator<String>> { - - public CancelFailingTask(Environment env) { - super(env); - } - - @Override - protected void init() {} - - @Override - protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { - final OneShotLatch latch = new OneShotLatch(); - final Object lock = new Object(); - - LockHolder holder = new LockHolder(lock, latch); - holder.start(); - try { - // cancellation should try and cancel this - getCancelables().registerCloseable(holder); - - // wait till the lock holder has the lock - latch.await(); - - // we are at the point where cancelling can happen - syncLatch.trigger(); - - // try to acquire the lock - this is not possible as long as the lock holder - // thread lives - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (lock) { - // nothing - } - } - finally { - holder.close(); - } - controller.allActionsCompleted(); - } - - @Override - protected void cleanup() {} - - @Override - protected void cancelTask() throws Exception { - throw new Exception("test exception"); - } - - } - private static class ThreadInspectingTask extends StreamTask<String, AbstractStreamOperator<String>> { private final long taskThreadId; @@ -1482,47 +1523,6 @@ public class StreamTaskTest extends TestLogger { // ------------------------------------------------------------------------ // ------------------------------------------------------------------------ - /** - * A thread that holds a lock as long as it lives. - */ - private static final class LockHolder extends Thread implements Closeable { - - private final OneShotLatch trigger; - private final Object lock; - private volatile boolean canceled; - - private LockHolder(Object lock, OneShotLatch trigger) { - this.lock = lock; - this.trigger = trigger; - } - - @Override - public void run() { - synchronized (lock) { - while (!canceled) { - // signal that we grabbed the lock - trigger.trigger(); - - // basically freeze this thread - try { - //noinspection SleepWhileHoldingLock - Thread.sleep(1000000000); - } catch (InterruptedException ignored) {} - } - } - } - - public void cancel() { - canceled = true; - } - - @Override - public void close() { - canceled = true; - interrupt(); - } - } - static class TestStreamSource<OUT, SRC extends SourceFunction<OUT>> extends StreamSource<OUT, SRC> { static AbstractKeyedStateBackend<?> keyedStateBackend;