Repository: flink
Updated Branches:
  refs/heads/master 31641a68e -> 563e54623


[FLINK-1419] [runtime] Fix: distributed cache properly synchronized

This closes #339


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/563e5462
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/563e5462
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/563e5462

Branch: refs/heads/master
Commit: 563e546236217dace58a8031d56d08a27e08160b
Parents: 31641a6
Author: zentol <s.mo...@web.de>
Authored: Mon Jan 26 11:07:53 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Mon Feb 2 00:27:39 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/filecache/FileCache.java      | 56 ++++++++++++--------
 1 file changed, 34 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/563e5462/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index de8d59c..b838aa4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -28,8 +28,6 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.api.common.cache.DistributedCache;
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.configuration.ConfigConstants;
@@ -60,7 +58,7 @@ public class FileCache {
        
        private LocalFileSystem lfs = new LocalFileSystem();
 
-       private Map<Pair<JobID, String>, Integer> count = new 
HashMap<Pair<JobID,String>, Integer>();
+       private Map<JobID, Map<String, Integer>> jobCounts = new HashMap<JobID, 
Map<String, Integer>>();
 
        private final ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
 
@@ -72,15 +70,18 @@ public class FileCache {
         * @return copy task
         */
        public FutureTask<Path> createTmpFile(String name, 
DistributedCacheEntry entry, JobID jobID) {
-               synchronized (count) {
-                       Pair<JobID, String> key = new ImmutablePair<JobID, 
String>(jobID, name);
-                       if (count.containsKey(key)) {
-                               count.put(key, count.get(key) + 1);
+               synchronized (lock) {
+                       if (!jobCounts.containsKey(jobID)) {
+                               jobCounts.put(jobID, new HashMap<String, 
Integer>());
+                       }
+                       Map<String, Integer> count = jobCounts.get(jobID);
+                       if (count.containsKey(name)) {
+                               count.put(name, count.get(name) + 1);
                        } else {
-                               count.put(key, 1);
+                               count.put(name, 1);
                        }
                }
-               CopyProcess cp = new CopyProcess(name, entry, jobID);
+               CopyProcess cp = new CopyProcess(entry, jobID);
                FutureTask<Path> copyTask = new FutureTask<Path>(cp);
                executorService.submit(copyTask);
                return copyTask;
@@ -93,7 +94,7 @@ public class FileCache {
         * @param jobID
         */
        public void deleteTmpFile(String name, DistributedCacheEntry entry, 
JobID jobID) {
-               DeleteProcess dp = new DeleteProcess(name, entry, jobID, 
count.get(new ImmutablePair<JobID, String>(jobID,name)));
+               DeleteProcess dp = new DeleteProcess(name, entry, jobID);
                executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS);
        }
 
@@ -153,7 +154,7 @@ public class FileCache {
                private String filePath;
                private Boolean executable;
 
-               public CopyProcess(String name, DistributedCacheEntry e, JobID 
jobID) {
+               public CopyProcess(DistributedCacheEntry e, JobID jobID) {
                        this.filePath = e.filePath;
                        this.executable = e.isExecutable;
                        this.jobID = jobID;
@@ -179,24 +180,35 @@ public class FileCache {
                
                private String name;
                private JobID jobID;
-               private int oldCount;
+               private String filePath;
 
-               public DeleteProcess(String name, DistributedCacheEntry e, 
JobID jobID, int c) {
+               public DeleteProcess(String name, DistributedCacheEntry e, 
JobID jobID) {
                        this.name = name;
                        this.jobID = jobID;
-                       this.oldCount = c;
+                       this.filePath = e.filePath;
                }
                @Override
                public void run() {
-                       synchronized (count) {
-                               if (count.get(new ImmutablePair<JobID, 
String>(jobID, name)) != oldCount) {
-                                       return;
-                               }
-                       }
-                       Path tmp = getTempDir(jobID, "");
+                       Path tmp = getTempDir(jobID, 
filePath.substring(filePath.lastIndexOf("/") + 1));
                        try {
-                               if (lfs.exists(tmp)) {
-                                       lfs.delete(tmp, true);
+                               synchronized (lock) {
+                                       Map<String, Integer> count = 
jobCounts.get(jobID);
+                                       if (count.containsKey(name)) {
+                                               count.put(name, count.get(name) 
- 1);
+                                               if (count.get(name) == 0) {
+                                                       if (lfs.exists(tmp)) {
+                                                               lfs.delete(tmp, 
true);
+                                                       }
+                                                       count.remove(name);
+                                                       if (count.isEmpty()) { 
//delete job directory
+                                                               tmp = 
getTempDir(jobID, "");
+                                                               if 
(lfs.exists(tmp)) {
+                                                                       
lfs.delete(tmp, true);
+                                                               }
+                                                               
jobCounts.remove(jobID);
+                                                       }
+                                               }
+                                       }
                                }
                        } catch (IOException e) {
                                LOG.error("Could not delete file from local 
file cache.", e);

Reply via email to