Repository: flink Updated Branches: refs/heads/master 31641a68e -> 563e54623
[FLINK-1419] [runtime] Fix: distributed cache properly synchronized This closes #339 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/563e5462 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/563e5462 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/563e5462 Branch: refs/heads/master Commit: 563e546236217dace58a8031d56d08a27e08160b Parents: 31641a6 Author: zentol <s.mo...@web.de> Authored: Mon Jan 26 11:07:53 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Mon Feb 2 00:27:39 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/filecache/FileCache.java | 56 ++++++++++++-------- 1 file changed, 34 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/563e5462/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java index de8d59c..b838aa4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java @@ -28,8 +28,6 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.configuration.ConfigConstants; @@ -60,7 +58,7 @@ public class FileCache { private LocalFileSystem lfs = new LocalFileSystem(); - private Map<Pair<JobID, String>, Integer> count = new HashMap<Pair<JobID,String>, Integer>(); + private Map<JobID, Map<String, Integer>> jobCounts = new HashMap<JobID, Map<String, Integer>>(); private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE); @@ -72,15 +70,18 @@ public class FileCache { * @return copy task */ public FutureTask<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) { - synchronized (count) { - Pair<JobID, String> key = new ImmutablePair<JobID, String>(jobID, name); - if (count.containsKey(key)) { - count.put(key, count.get(key) + 1); + synchronized (lock) { + if (!jobCounts.containsKey(jobID)) { + jobCounts.put(jobID, new HashMap<String, Integer>()); + } + Map<String, Integer> count = jobCounts.get(jobID); + if (count.containsKey(name)) { + count.put(name, count.get(name) + 1); } else { - count.put(key, 1); + count.put(name, 1); } } - CopyProcess cp = new CopyProcess(name, entry, jobID); + CopyProcess cp = new CopyProcess(entry, jobID); FutureTask<Path> copyTask = new FutureTask<Path>(cp); executorService.submit(copyTask); return copyTask; @@ -93,7 +94,7 @@ public class FileCache { * @param jobID */ public void deleteTmpFile(String name, DistributedCacheEntry entry, JobID jobID) { - DeleteProcess dp = new DeleteProcess(name, entry, jobID, count.get(new ImmutablePair<JobID, String>(jobID,name))); + DeleteProcess dp = new DeleteProcess(name, entry, jobID); executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS); } @@ -153,7 +154,7 @@ public class FileCache { private String filePath; private Boolean executable; - public CopyProcess(String name, DistributedCacheEntry e, JobID jobID) { + public CopyProcess(DistributedCacheEntry e, JobID jobID) { this.filePath = e.filePath; this.executable = e.isExecutable; this.jobID = jobID; @@ -179,24 +180,35 @@ public class FileCache { private String name; private JobID jobID; - private int oldCount; + private String filePath; - public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID, int c) { + public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID) { this.name = name; this.jobID = jobID; - this.oldCount = c; + this.filePath = e.filePath; } @Override public void run() { - synchronized (count) { - if (count.get(new ImmutablePair<JobID, String>(jobID, name)) != oldCount) { - return; - } - } - Path tmp = getTempDir(jobID, ""); + Path tmp = getTempDir(jobID, filePath.substring(filePath.lastIndexOf("/") + 1)); try { - if (lfs.exists(tmp)) { - lfs.delete(tmp, true); + synchronized (lock) { + Map<String, Integer> count = jobCounts.get(jobID); + if (count.containsKey(name)) { + count.put(name, count.get(name) - 1); + if (count.get(name) == 0) { + if (lfs.exists(tmp)) { + lfs.delete(tmp, true); + } + count.remove(name); + if (count.isEmpty()) { //delete job directory + tmp = getTempDir(jobID, ""); + if (lfs.exists(tmp)) { + lfs.delete(tmp, true); + } + jobCounts.remove(jobID); + } + } + } } } catch (IOException e) { LOG.error("Could not delete file from local file cache.", e);