Author: tgraves
Date: Fri Aug 17 19:47:35 2012
New Revision: 1374407

URL: http://svn.apache.org/viewvc?rev=1374407&view=rev
Log:
MAPREDUCE-4549. Distributed cache conflicts breaks backwards compatability 
(Robert Evans via tgraves)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
    
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1374407&r1=1374406&r2=1374407&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri 
Aug 17 19:47:35 2012
@@ -392,6 +392,9 @@ Release 0.23.3 - UNRELEASED
     for compatibility reasons is creating incorrect counter name.
     (Jarek Jarcec Cecho via tomwhite)
 
+    MAPREDUCE-4549. Distributed cache conflicts breaks backwards compatability 
+    (Robert Evans via tgraves)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1374407&r1=1374406&r2=1374407&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
 Fri Aug 17 19:47:35 2012
@@ -30,6 +30,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -48,14 +50,15 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.yarn.ContainerLogAppender;
 import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 /**
  * Helper class for MR applications
@@ -63,6 +66,8 @@ import org.apache.hadoop.yarn.util.Build
 @Private
 @Unstable
 public class MRApps extends Apps {
+  private static final Log LOG = LogFactory.getLog(MRApps.class);
+  
   public static String toString(JobId jid) {
     return jid.toString();
   }
@@ -263,6 +268,23 @@ public class MRApps extends Apps {
         DistributedCache.getFileClassPaths(conf));
   }
 
+  private static String getResourceDescription(LocalResourceType type) {
+    if(type == LocalResourceType.ARCHIVE) {
+      return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
+    }
+    return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
+  }
+  
+  private static String toString(org.apache.hadoop.yarn.api.records.URL url) {
+    StringBuffer b = new StringBuffer();
+    b.append(url.getScheme()).append("://").append(url.getHost());
+    if(url.getPort() >= 0) {
+      b.append(":").append(url.getPort());
+    }
+    b.append(url.getFile());
+    return b.toString();
+  }
+  
   // TODO - Move this to MR!
   // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], 
   // long[], boolean[], Path[], FileType)
@@ -308,6 +330,17 @@ public class MRApps extends Apps {
           throw new IllegalArgumentException("Resource name must be relative");
         }
         String linkName = name.toUri().getPath();
+        LocalResource orig = localResources.get(linkName);
+        org.apache.hadoop.yarn.api.records.URL url = 
+          ConverterUtils.getYarnUrlFromURI(p.toUri());
+        if(orig != null && !orig.getResource().equals(url)) {
+          LOG.warn(
+              getResourceDescription(orig.getType()) + 
+              toString(orig.getResource()) + " conflicts with " + 
+              getResourceDescription(type) + toString(url) + 
+              " This will be an error in Hadoop 2.0");
+          continue;
+        }
         localResources.put(
             linkName,
             BuilderUtils.newLocalResource(

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1374407&r1=1374406&r2=1374407&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
 Fri Aug 17 19:47:35 2012
@@ -19,23 +19,30 @@
 package org.apache.hadoop.mapreduce.v2.util;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-
 import org.junit.Test;
+
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 public class TestMRApps {
 
@@ -168,5 +175,138 @@ public class TestMRApps {
     assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking 
effect!",
       env_str.indexOf("$PWD:job.jar"), 0);
   }
-
+  
+  @Test
+  public void testSetupDistributedCacheEmpty() throws IOException {
+    Configuration conf = new Configuration();
+    Map<String, LocalResource> localResources = new HashMap<String, 
LocalResource>();
+    MRApps.setupDistributedCache(conf, localResources);
+    assertTrue("Empty Config did not produce an empty list of resources",
+        localResources.isEmpty());
+  }
+  
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testSetupDistributedCacheConflicts() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    
+    URI mockUri = URI.create("mockfs://mock/");
+    FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
+        .getRawFileSystem();
+    
+    URI archive = new URI("mockfs://mock/tmp/something.zip#something");
+    Path archivePath = new Path(archive);
+    URI file = new URI("mockfs://mock/tmp/something.txt#something");
+    Path filePath = new Path(file);
+    
+    when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
+    when(mockFs.resolvePath(filePath)).thenReturn(filePath);
+    
+    DistributedCache.addCacheArchive(archive, conf);
+    conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
+    conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
+    conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
+    DistributedCache.addCacheFile(file, conf);
+    conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
+    conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
+    conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
+    Map<String, LocalResource> localResources = 
+      new HashMap<String, LocalResource>();
+    MRApps.setupDistributedCache(conf, localResources);
+    
+    assertEquals(1, localResources.size());
+    LocalResource lr = localResources.get("something");
+    //Archive wins
+    assertNotNull(lr);
+    assertEquals(10l, lr.getSize());
+    assertEquals(10l, lr.getTimestamp());
+    assertEquals(LocalResourceType.ARCHIVE, lr.getType());
+  }
+  
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testSetupDistributedCacheConflictsFiles() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    
+    URI mockUri = URI.create("mockfs://mock/");
+    FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
+        .getRawFileSystem();
+    
+    URI file = new URI("mockfs://mock/tmp/something.zip#something");
+    Path filePath = new Path(file);
+    URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
+    Path file2Path = new Path(file);
+    
+    when(mockFs.resolvePath(filePath)).thenReturn(filePath);
+    when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
+    
+    DistributedCache.addCacheFile(file, conf);
+    DistributedCache.addCacheFile(file2, conf);
+    conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
+    conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
+    conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
+    Map<String, LocalResource> localResources = 
+      new HashMap<String, LocalResource>();
+    MRApps.setupDistributedCache(conf, localResources);
+    
+    assertEquals(1, localResources.size());
+    LocalResource lr = localResources.get("something");
+    //First one wins
+    assertNotNull(lr);
+    assertEquals(10l, lr.getSize());
+    assertEquals(10l, lr.getTimestamp());
+    assertEquals(LocalResourceType.FILE, lr.getType());
+  }
+  
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testSetupDistributedCache() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    
+    URI mockUri = URI.create("mockfs://mock/");
+    FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
+        .getRawFileSystem();
+    
+    URI archive = new URI("mockfs://mock/tmp/something.zip");
+    Path archivePath = new Path(archive);
+    URI file = new URI("mockfs://mock/tmp/something.txt#something");
+    Path filePath = new Path(file);
+    
+    when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
+    when(mockFs.resolvePath(filePath)).thenReturn(filePath);
+    
+    DistributedCache.addCacheArchive(archive, conf);
+    conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
+    conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
+    conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
+    DistributedCache.addCacheFile(file, conf);
+    conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
+    conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
+    conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
+    Map<String, LocalResource> localResources = 
+      new HashMap<String, LocalResource>();
+    MRApps.setupDistributedCache(conf, localResources);
+    assertEquals(2, localResources.size());
+    LocalResource lr = localResources.get("something.zip");
+    assertNotNull(lr);
+    assertEquals(10l, lr.getSize());
+    assertEquals(10l, lr.getTimestamp());
+    assertEquals(LocalResourceType.ARCHIVE, lr.getType());
+    lr = localResources.get("something");
+    assertNotNull(lr);
+    assertEquals(11l, lr.getSize());
+    assertEquals(11l, lr.getTimestamp());
+    assertEquals(LocalResourceType.FILE, lr.getType());
+  }
+  
+  static class MockFileSystem extends FilterFileSystem {
+    MockFileSystem() {
+      super(mock(FileSystem.class));
+    }
+    public void initialize(URI name, Configuration conf) throws IOException {}
+  }
+  
 }


Reply via email to