Repository: flink
Updated Branches:
  refs/heads/master d160b5e56 -> 24408e190


[FLINK-5962] [checkpoints] Remove scheduled cancel-task from timer queue to 
prevent memory leaks

This closes #3548


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70252f34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70252f34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70252f34

Branch: refs/heads/master
Commit: 70252f3468916758e8bc456bbf482549c38ad7ff
Parents: afd36f9
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 16:44:41 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 79 +++++++++++++-------
 .../runtime/checkpoint/PendingCheckpoint.java   | 38 ++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   | 16 +++-
 .../checkpoint/PendingCheckpointTest.java       | 18 +++++
 4 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0592e3d..cc60837 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.TaskStateHandles;
 
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,9 +50,10 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -134,7 +136,7 @@ public class CheckpointCoordinator {
        private final int maxConcurrentCheckpointAttempts;
 
        /** The timer that handles the checkpoint timeouts and triggers 
periodic checkpoints */
-       private final Timer timer;
+       private final ScheduledThreadPoolExecutor timer;
 
        /** Actor that receives status updates from the execution graph this 
coordinator works for */
        private JobStatusListener jobStatusListener;
@@ -142,7 +144,8 @@ public class CheckpointCoordinator {
        /** The number of consecutive failed trigger attempts */
        private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new 
AtomicInteger(0);
 
-       private ScheduledTrigger currentPeriodicTrigger;
+       /** A handle to the current periodic trigger, to cancel it when 
necessary */
+       private ScheduledFuture<?> currentPeriodicTrigger;
 
        /** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed */
        private long lastCheckpointCompletionNanos;
@@ -218,7 +221,13 @@ public class CheckpointCoordinator {
 
                this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 
-               this.timer = new Timer("Checkpoint Timer", true);
+               this.timer = new ScheduledThreadPoolExecutor(1,
+                               new 
DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint 
Timer"));
+
+               // make sure the timer internally cleans up and does not hold 
onto stale scheduled tasks
+               this.timer.setRemoveOnCancelPolicy(true);
+               
this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+               
this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
 
                if (externalizeSettings.externalizeCheckpoints()) {
                        LOG.info("Persisting periodic checkpoints externally at 
{}.", checkpointDirectory);
@@ -265,7 +274,7 @@ public class CheckpointCoordinator {
                                triggerRequestQueued = false;
 
                                // shut down the thread that handles the 
timeouts and pending triggers
-                               timer.cancel();
+                               timer.shutdownNow();
 
                                // clear and discard all pending checkpoints
                                for (PendingCheckpoint pending : 
pendingCheckpoints.values()) {
@@ -392,7 +401,7 @@ public class CheckpointCoordinator {
                                if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
                                        triggerRequestQueued = true;
                                        if (currentPeriodicTrigger != null) {
-                                               currentPeriodicTrigger.cancel();
+                                               
currentPeriodicTrigger.cancel(false);
                                                currentPeriodicTrigger = null;
                                        }
                                        return new 
CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -404,13 +413,14 @@ public class CheckpointCoordinator {
 
                                if (durationTillNextMillis > 0) {
                                        if (currentPeriodicTrigger != null) {
-                                               currentPeriodicTrigger.cancel();
+                                               
currentPeriodicTrigger.cancel(false);
                                                currentPeriodicTrigger = null;
                                        }
-                                       ScheduledTrigger trigger = new 
ScheduledTrigger();
                                        // Reassign the new trigger to the 
currentPeriodicTrigger
-                                       currentPeriodicTrigger = trigger;
-                                       timer.scheduleAtFixedRate(trigger, 
durationTillNextMillis, baseInterval);
+                                       currentPeriodicTrigger = 
timer.scheduleAtFixedRate(
+                                                       new ScheduledTrigger(),
+                                                       durationTillNextMillis, 
baseInterval, TimeUnit.MILLISECONDS);
+
                                        return new 
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                                }
                        }
@@ -483,7 +493,7 @@ public class CheckpointCoordinator {
                        }
 
                        // schedule the timer that will clean up the expired 
checkpoints
-                       TimerTask canceller = new TimerTask() {
+                       final Runnable canceller = new Runnable() {
                                @Override
                                public void run() {
                                        synchronized (lock) {
@@ -519,7 +529,7 @@ public class CheckpointCoordinator {
                                                if (pendingCheckpoints.size() 
>= maxConcurrentCheckpointAttempts) {
                                                        triggerRequestQueued = 
true;
                                                        if 
(currentPeriodicTrigger != null) {
-                                                               
currentPeriodicTrigger.cancel();
+                                                               
currentPeriodicTrigger.cancel(false);
                                                                
currentPeriodicTrigger = null;
                                                        }
                                                        return new 
CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -531,14 +541,15 @@ public class CheckpointCoordinator {
 
                                                if (durationTillNextMillis > 0) 
{
                                                        if 
(currentPeriodicTrigger != null) {
-                                                               
currentPeriodicTrigger.cancel();
+                                                               
currentPeriodicTrigger.cancel(false);
                                                                
currentPeriodicTrigger = null;
                                                        }
 
-                                                       ScheduledTrigger 
trigger = new ScheduledTrigger();
                                                        // Reassign the new 
trigger to the currentPeriodicTrigger
-                                                       currentPeriodicTrigger 
= trigger;
-                                                       
timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
+                                                       currentPeriodicTrigger 
= timer.scheduleAtFixedRate(
+                                                                       new 
ScheduledTrigger(),
+                                                                       
durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
+
                                                        return new 
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                                                }
                                        }
@@ -546,7 +557,15 @@ public class CheckpointCoordinator {
                                        LOG.info("Triggering checkpoint " + 
checkpointID + " @ " + timestamp);
 
                                        pendingCheckpoints.put(checkpointID, 
checkpoint);
-                                       timer.schedule(canceller, 
checkpointTimeout);
+
+                                       ScheduledFuture<?> cancellerHandle = 
timer.schedule(
+                                                       canceller,
+                                                       checkpointTimeout, 
TimeUnit.MILLISECONDS);
+
+                                       if 
(!checkpoint.setCancellerHandle(cancellerHandle)) {
+                                               // checkpoint is already 
disposed!
+                                               cancellerHandle.cancel(false);
+                                       }
                                }
                                // end of lock scope
 
@@ -866,20 +885,25 @@ public class CheckpointCoordinator {
 
                        // trigger the checkpoint from the trigger timer, to 
finish the work of this thread before
                        // starting with the next checkpoint
-                       ScheduledTrigger trigger = new ScheduledTrigger();
                        if (periodicScheduling) {
                                if (currentPeriodicTrigger != null) {
-                                       currentPeriodicTrigger.cancel();
+                                       currentPeriodicTrigger.cancel(false);
                                }
-                               currentPeriodicTrigger = trigger;
-                               timer.scheduleAtFixedRate(trigger, 0L, 
baseInterval);
+                               currentPeriodicTrigger = 
timer.scheduleAtFixedRate(
+                                               new ScheduledTrigger(),
+                                               0L, baseInterval, 
TimeUnit.MILLISECONDS);
                        }
                        else {
-                               timer.schedule(trigger, 0L);
+                               timer.execute(new ScheduledTrigger());
                        }
                }
        }
 
+       @VisibleForTesting
+       int getNumScheduledTasks() {
+               return timer.getQueue().size();
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Checkpoint State Restoring
        // 
--------------------------------------------------------------------------------------------
@@ -1006,8 +1030,9 @@ public class CheckpointCoordinator {
                        stopCheckpointScheduler();
 
                        periodicScheduling = true;
-                       currentPeriodicTrigger = new ScheduledTrigger();
-                       timer.scheduleAtFixedRate(currentPeriodicTrigger, 
baseInterval, baseInterval);
+                       currentPeriodicTrigger = timer.scheduleAtFixedRate(
+                                       new ScheduledTrigger(), 
+                                       baseInterval, baseInterval, 
TimeUnit.MILLISECONDS);
                }
        }
 
@@ -1017,7 +1042,7 @@ public class CheckpointCoordinator {
                        periodicScheduling = false;
 
                        if (currentPeriodicTrigger != null) {
-                               currentPeriodicTrigger.cancel();
+                               currentPeriodicTrigger.cancel(false);
                                currentPeriodicTrigger = null;
                        }
 
@@ -1050,7 +1075,7 @@ public class CheckpointCoordinator {
 
        // 
------------------------------------------------------------------------
 
-       private class ScheduledTrigger extends TimerTask {
+       private final class ScheduledTrigger implements Runnable {
 
                @Override
                public void run() {

http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 5ca6040..b7eb037 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -46,6 +46,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -99,6 +100,8 @@ public class PendingCheckpoint {
        @Nullable
        private PendingCheckpointStats statsCallback;
 
+       private volatile ScheduledFuture<?> cancellerHandle;
+
        // 
--------------------------------------------------------------------------------------------
 
        public PendingCheckpoint(
@@ -197,6 +200,27 @@ public class PendingCheckpoint {
                this.statsCallback = trackerCallback;
        }
 
+       /**
+        * Sets the handle for the canceller to this pending checkoint.
+        * 
+        * @return true, if the handle was set, false, if the checkpoint is 
already disposed;
+        */
+       public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) {
+               synchronized (lock) {
+                       if (this.cancellerHandle == null) {
+                               if (!discarded) {
+                                       this.cancellerHandle = cancellerHandle;
+                                       return true;
+                               } else {
+                                       return false;
+                               }
+                       }
+                       else {
+                               throw new IllegalStateException("A canceller 
handle was already set");
+                       }
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Progress and Completion
        // 
------------------------------------------------------------------------
@@ -490,10 +514,24 @@ public class PendingCheckpoint {
                                discarded = true;
                                notYetAcknowledgedTasks.clear();
                                acknowledgedTasks.clear();
+                               cancelCanceller();
                        }
                }
        }
 
+       private void cancelCanceller() {
+               try {
+                       final ScheduledFuture<?> canceller = 
this.cancellerHandle;
+                       if (canceller != null) {
+                               canceller.cancel(false);
+                       }
+               }
+               catch (Exception e) {
+                       // this code should not throw exceptions
+                       LOG.warn("Error while cancelling checkpoint timeout 
task", e);
+               }
+       }
+
        /**
         * Reports a failed checkpoint with the given optional cause.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1691370..d8bba59 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -299,6 +299,9 @@ public class CheckpointCoordinatorTest {
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
+                       // we have one task scheduled that will cancel after 
timeout
+                       assertEquals(1, coord.getNumScheduledTasks());
+
                        long checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
                        PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
 
@@ -336,6 +339,9 @@ public class CheckpointCoordinatorTest {
                        coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
                        assertTrue(checkpoint.isDiscarded());
 
+                       // the canceler is also removed
+                       assertEquals(0, coord.getNumScheduledTasks());
+
                        // validate that we have no new pending checkpoint
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -389,6 +395,7 @@ public class CheckpointCoordinatorTest {
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(0, coord.getNumScheduledTasks());
 
                        // trigger the first checkpoint. this should succeed
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -399,6 +406,7 @@ public class CheckpointCoordinatorTest {
                        // validate that we have a pending checkpoint
                        assertEquals(2, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(2, coord.getNumScheduledTasks());
 
                        Iterator<Map.Entry<Long, PendingCheckpoint>> it = 
coord.getPendingCheckpoints().entrySet().iterator();
                        long checkpoint1Id = it.next().getKey();
@@ -439,13 +447,13 @@ public class CheckpointCoordinatorTest {
                        }
 
                        // decline checkpoint from one of the tasks, this 
should cancel the checkpoint
-                       // and trigger a new one
                        coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id));
                        assertTrue(checkpoint1.isDiscarded());
 
                        // validate that we have only one pending checkpoint 
left
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(1, coord.getNumScheduledTasks());
 
                        // validate that it is the same second checkpoint from 
earlier
                        long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
@@ -506,6 +514,7 @@ public class CheckpointCoordinatorTest {
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(0, coord.getNumScheduledTasks());
 
                        // trigger the first checkpoint. this should succeed
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -513,6 +522,7 @@ public class CheckpointCoordinatorTest {
                        // validate that we have a pending checkpoint
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(1, coord.getNumScheduledTasks());
 
                        long checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
                        PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
@@ -556,6 +566,9 @@ public class CheckpointCoordinatorTest {
                        assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
+                       // the canceler should be removed now
+                       assertEquals(0, coord.getNumScheduledTasks());
+
                        // validate that the relevant tasks got a confirmation 
message
                        {
                                verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
@@ -580,6 +593,7 @@ public class CheckpointCoordinatorTest {
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(0, coord.getNumScheduledTasks());
 
                        CompletedCheckpoint successNew = 
coord.getSuccessfulCheckpoints().get(0);
                        assertEquals(jid, successNew.getJobId());

http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 6f04f39..55b5fe0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -287,6 +288,23 @@ public class PendingCheckpointTest {
                }
        }
 
+       @Test
+       public void testSetCanceller() {
+               final CheckpointProperties props = new 
CheckpointProperties(false, false, true, true, true, true, true);
+
+               PendingCheckpoint aborted = createPendingCheckpoint(props, 
null);
+               aborted.abortDeclined();
+               assertTrue(aborted.isDiscarded());
+               
assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class)));
+
+               PendingCheckpoint pending = createPendingCheckpoint(props, 
null);
+               ScheduledFuture<?> canceller = mock(ScheduledFuture.class);
+
+               assertTrue(pending.setCancellerHandle(canceller));
+               pending.abortDeclined();
+               verify(canceller).cancel(false);
+       }
+
        // 
------------------------------------------------------------------------
 
        private static PendingCheckpoint 
createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {

Reply via email to