Author: jlowe
Date: Thu Aug 14 16:08:53 2014
New Revision: 1617987

URL: http://svn.apache.org/r1617987
Log:
svn merge -c 1617984 FIXES: MAPREDUCE-6010. HistoryServerFileSystemStateStore 
fails to update tokens. Contributed by Jason Lowe

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1617987&r1=1617986&r2=1617987&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu 
Aug 14 16:08:53 2014
@@ -56,6 +56,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5363. Fix doc and spelling for TaskCompletionEvent#getTaskStatus 
       and getStatus (Akira AJISAKA via aw)
 
+    MAPREDUCE-6010. HistoryServerFileSystemStateStore fails to update tokens
+    (jlowe)
+
     MAPREDUCE-5595. Typo in MergeManagerImpl.java (Akira AJISAKA via aw)
 
     MAPREDUCE-5597. Missing alternatives in javadocs for deprecated 
constructors 

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java?rev=1617987&r1=1617986&r2=1617987&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java
 Thu Aug 14 16:08:53 2014
@@ -24,6 +24,8 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -64,6 +66,7 @@ public class HistoryServerFileSystemStat
   private static final String TOKEN_MASTER_KEY_FILE_PREFIX = "key_";
   private static final String TOKEN_FILE_PREFIX = "token_";
   private static final String TMP_FILE_PREFIX = "tmp-";
+  private static final String UPDATE_TMP_FILE_PREFIX = "update-";
   private static final FsPermission DIR_PERMISSIONS =
       new FsPermission((short)0700);
   private static final FsPermission FILE_PERMISSIONS = Shell.WINDOWS
@@ -90,7 +93,7 @@ public class HistoryServerFileSystemStat
 
   @Override
   protected void startStorage() throws IOException {
-    fs = rootStatePath.getFileSystem(getConfig());
+    fs = createFileSystem();
     createDir(rootStatePath);
     tokenStatePath = new Path(rootStatePath, TOKEN_STATE_DIR_NAME);
     createDir(tokenStatePath);
@@ -101,6 +104,10 @@ public class HistoryServerFileSystemStat
     }
   }
 
+  FileSystem createFileSystem() throws IOException {
+    return rootStatePath.getFileSystem(getConfig());
+  }
+
   @Override
   protected void closeStorage() throws IOException {
     // don't close the filesystem as it's part of the filesystem cache
@@ -127,7 +134,7 @@ public class HistoryServerFileSystemStat
       throw new IOException(tokenPath + " already exists");
     }
 
-    createFile(tokenPath, buildTokenData(tokenId, renewDate));
+    createNewFile(tokenPath, buildTokenData(tokenId, renewDate));
   }
 
   @Override
@@ -136,7 +143,25 @@ public class HistoryServerFileSystemStat
     if (LOG.isDebugEnabled()) {
       LOG.debug("Updating token " + tokenId.getSequenceNumber());
     }
-    createFile(getTokenPath(tokenId), buildTokenData(tokenId, renewDate));
+
+    // Files cannot be atomically replaced, therefore we write a temporary
+    // update file, remove the original token file, then rename the update
+    // file to the token file. During recovery either the token file will be
+    // used or if that is missing and an update file is present then the
+    // update file is used.
+    Path tokenPath = getTokenPath(tokenId);
+    Path tmp = new Path(tokenPath.getParent(),
+        UPDATE_TMP_FILE_PREFIX + tokenPath.getName());
+    writeFile(tmp, buildTokenData(tokenId, renewDate));
+    try {
+      deleteFile(tokenPath);
+    } catch (IOException e) {
+      fs.delete(tmp, false);
+      throw e;
+    }
+    if (!fs.rename(tmp, tokenPath)) {
+      throw new IOException("Could not rename " + tmp + " to " + tokenPath);
+    }
   }
 
   @Override
@@ -168,7 +193,7 @@ public class HistoryServerFileSystemStat
       IOUtils.cleanup(LOG, dataStream);
     }
 
-    createFile(keyPath, memStream.toByteArray());
+    createNewFile(keyPath, memStream.toByteArray());
   }
 
   @Override
@@ -213,23 +238,33 @@ public class HistoryServerFileSystemStat
     }
   }
 
-  private void createFile(Path file, byte[] data) throws IOException {
-    final int WRITE_BUFFER_SIZE = 4096;
+  private void createNewFile(Path file, byte[] data)
+      throws IOException {
     Path tmp = new Path(file.getParent(), TMP_FILE_PREFIX + file.getName());
-    FSDataOutputStream out = fs.create(tmp, FILE_PERMISSIONS, true,
-        WRITE_BUFFER_SIZE, fs.getDefaultReplication(tmp),
-        fs.getDefaultBlockSize(tmp), null);
+    writeFile(tmp, data);
+    try {
+      if (!fs.rename(tmp, file)) {
+        throw new IOException("Could not rename " + tmp + " to " + file);
+      }
+    } catch (IOException e) {
+      fs.delete(tmp, false);
+      throw e;
+    }
+  }
+
+  private void writeFile(Path file, byte[] data) throws IOException {
+    final int WRITE_BUFFER_SIZE = 4096;
+    FSDataOutputStream out = fs.create(file, FILE_PERMISSIONS, true,
+        WRITE_BUFFER_SIZE, fs.getDefaultReplication(file),
+        fs.getDefaultBlockSize(file), null);
     try {
       try {
         out.write(data);
       } finally {
         IOUtils.cleanup(LOG, out);
       }
-      if (!fs.rename(tmp, file)) {
-        throw new IOException("Could not rename " + tmp + " to " + file);
-      }
     } catch (IOException e) {
-      fs.delete(tmp, false);
+      fs.delete(file, false);
       throw e;
     }
   }
@@ -284,6 +319,19 @@ public class HistoryServerFileSystemStat
     state.tokenMasterKeyState.add(key);
   }
 
+  private void loadTokenFromBucket(int bucketId,
+      HistoryServerState state, Path tokenFile, long numTokenFileBytes)
+          throws IOException {
+    MRDelegationTokenIdentifier token =
+        loadToken(state, tokenFile, numTokenFileBytes);
+    int tokenBucketId = getBucketId(token);
+    if (tokenBucketId != bucketId) {
+      throw new IOException("Token " + tokenFile
+          + " should be in bucket " + tokenBucketId + ", found in bucket "
+          + bucketId);
+    }
+  }
+
   private MRDelegationTokenIdentifier loadToken(HistoryServerState state,
       Path tokenFile, long numTokenFileBytes) throws IOException {
     MRDelegationTokenIdentifier tokenId = new MRDelegationTokenIdentifier();
@@ -308,18 +356,29 @@ public class HistoryServerFileSystemStat
     final int bucketId = Integer.parseInt(numStr);
     int numTokens = 0;
     FileStatus[] tokenStats = fs.listStatus(bucket);
+    Set<String> loadedTokens = new HashSet<String>(tokenStats.length);
     for (FileStatus stat : tokenStats) {
       String name = stat.getPath().getName();
       if (name.startsWith(TOKEN_FILE_PREFIX)) {
-        MRDelegationTokenIdentifier token =
-            loadToken(state, stat.getPath(), stat.getLen());
-        int tokenBucketId = getBucketId(token);
-        if (tokenBucketId != bucketId) {
-          throw new IOException("Token " + stat.getPath()
-              + " should be in bucket " + tokenBucketId + ", found in bucket "
-              + bucketId);
-        }
+        loadTokenFromBucket(bucketId, state, stat.getPath(), stat.getLen());
+        loadedTokens.add(name);
         ++numTokens;
+      } else if (name.startsWith(UPDATE_TMP_FILE_PREFIX)) {
+        String tokenName = name.substring(UPDATE_TMP_FILE_PREFIX.length());
+        if (loadedTokens.contains(tokenName)) {
+          // already have the token, update may be partial so ignore it
+          fs.delete(stat.getPath(), false);
+        } else {
+          // token is missing, so try to parse the update temp file
+          loadTokenFromBucket(bucketId, state, stat.getPath(), stat.getLen());
+          fs.rename(stat.getPath(),
+              new Path(stat.getPath().getParent(), tokenName));
+          loadedTokens.add(tokenName);
+          ++numTokens;
+        }
+      } else if (name.startsWith(TMP_FILE_PREFIX)) {
+        // cleanup incomplete temp files
+        fs.delete(stat.getPath(), false);
       } else {
         LOG.warn("Skipping unexpected file in history server token bucket: "
             + stat.getPath());

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java?rev=1617987&r1=1617986&r2=1617987&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java
 Thu Aug 14 16:08:53 2014
@@ -21,12 +21,19 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.spy;
 
 import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
 import 
org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState;
@@ -35,6 +42,7 @@ import org.apache.hadoop.security.token.
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentMatcher;
 
 public class TestHistoryServerFileSystemStateStoreService {
 
@@ -74,8 +82,8 @@ public class TestHistoryServerFileSystem
     return store;
   }
 
-  @Test
-  public void testTokenStore() throws IOException {
+  private void testTokenStore(String stateStoreUri) throws IOException {
+    conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI, stateStoreUri);
     HistoryServerStateStoreService store = createAndStartStore();
 
     HistoryServerState state = store.loadState();
@@ -161,4 +169,77 @@ public class TestHistoryServerFileSystem
     assertTrue("missing master key 3",
         state.tokenMasterKeyState.contains(key3));
   }
+
+  @Test
+  public void testTokenStore() throws IOException {
+    testTokenStore(testDir.getAbsoluteFile().toURI().toString());
+  }
+
+  @Test
+  public void testTokenStoreHdfs() throws IOException {
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    conf = cluster.getConfiguration(0);
+    try {
+      testTokenStore("/tmp/historystore");
+    } finally {
+        cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testUpdatedTokenRecovery() throws IOException {
+    IOException intentionalErr = new IOException("intentional error");
+    FileSystem fs = FileSystem.getLocal(conf);
+    final FileSystem spyfs = spy(fs);
+    // make the update token process fail halfway through where we're left
+    // with just the temporary update file and no token file
+    ArgumentMatcher<Path> updateTmpMatcher = new ArgumentMatcher<Path>() {
+      @Override
+      public boolean matches(Object argument) {
+        if (argument instanceof Path) {
+          return ((Path) argument).getName().startsWith("update");
+        }
+        return false;
+      }
+    };
+    doThrow(intentionalErr)
+        .when(spyfs).rename(argThat(updateTmpMatcher), isA(Path.class));
+
+    conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI,
+        testDir.getAbsoluteFile().toURI().toString());
+    HistoryServerStateStoreService store =
+        new HistoryServerFileSystemStateStoreService() {
+          @Override
+          FileSystem createFileSystem() throws IOException {
+            return spyfs;
+          }
+    };
+    store.init(conf);
+    store.start();
+
+    final MRDelegationTokenIdentifier token1 =
+        new MRDelegationTokenIdentifier(new Text("tokenOwner1"),
+            new Text("tokenRenewer1"), new Text("tokenUser1"));
+    token1.setSequenceNumber(1);
+    final Long tokenDate1 = 1L;
+    store.storeToken(token1, tokenDate1);
+    final Long newTokenDate1 = 975318642L;
+    try {
+      store.updateToken(token1, newTokenDate1);
+      fail("intentional error not thrown");
+    } catch (IOException e) {
+      assertEquals(intentionalErr, e);
+    }
+    store.close();
+
+    // verify the update file is seen and parsed upon recovery when
+    // original token file is missing
+    store = createAndStartStore();
+    HistoryServerState state = store.loadState();
+    assertEquals("incorrect loaded token count", 1, state.tokenState.size());
+    assertTrue("missing token 1", state.tokenState.containsKey(token1));
+    assertEquals("incorrect token 1 date", newTokenDate1,
+        state.tokenState.get(token1));
+    store.close();
+  }
 }


Reply via email to