Author: wang
Date: Wed Jul 23 20:45:10 2014
New Revision: 1612941

URL: http://svn.apache.org/r1612941
Log:
Merge from trunk to branch pt 2

Modified:
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/   (props 
changed)
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt   
(contents, props changed)
    
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
    
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java

Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1612403-1612915

Modified: 
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt?rev=1612941&r1=1612940&r2=1612941&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt 
Wed Jul 23 20:45:10 2014
@@ -159,6 +159,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5971. Move the default options for distcp -p to
     DistCpOptionSwitch. (clamb via wang)
 
+    MAPREDUCE-5963. ShuffleHandler DB schema should be versioned with
+    compatible/incompatible changes (Junping Du via jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES

Propchange: 
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged 
/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1612403-1612915

Modified: 
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1612941&r1=1612940&r2=1612941&view=diff
==============================================================================
--- 
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
 (original)
+++ 
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
 Wed Jul 23 20:45:10 2014
@@ -82,10 +82,13 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import org.apache.hadoop.yarn.server.api.AuxiliaryService;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.fusesource.leveldbjni.JniDBFactory;
@@ -125,6 +128,7 @@ import org.jboss.netty.handler.stream.Ch
 import org.jboss.netty.util.CharsetUtil;
 import org.mortbay.jetty.HttpHeaders;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
@@ -146,8 +150,9 @@ public class ShuffleHandler extends Auxi
       Pattern.CASE_INSENSITIVE);
 
   private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
-  private static final String STATE_DB_SCHEMA_VERSION_KEY = "schema-version";
-  private static final String STATE_DB_SCHEMA_VERSION = "1.0";
+  private static final String STATE_DB_SCHEMA_VERSION_KEY = 
"shuffle-schema-version";
+  protected static final NMDBSchemaVersion CURRENT_VERSION_INFO = 
+      NMDBSchemaVersion.newInstance(1, 0);
 
   private int port;
   private ChannelFactory selector;
@@ -466,18 +471,15 @@ public class ShuffleHandler extends Auxi
     Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
     LOG.info("Using state database at " + dbPath + " for recovery");
     File dbfile = new File(dbPath.toString());
-    byte[] schemaVersionData;
     try {
       stateDb = JniDBFactory.factory.open(dbfile, options);
-      schemaVersionData = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
     } catch (NativeDB.DBException e) {
       if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
         LOG.info("Creating state database at " + dbfile);
         options.createIfMissing(true);
         try {
           stateDb = JniDBFactory.factory.open(dbfile, options);
-          schemaVersionData = bytes(STATE_DB_SCHEMA_VERSION);
-          stateDb.put(bytes(STATE_DB_SCHEMA_VERSION_KEY), schemaVersionData);
+          storeVersion();
         } catch (DBException dbExc) {
           throw new IOException("Unable to create state store", dbExc);
         }
@@ -485,15 +487,69 @@ public class ShuffleHandler extends Auxi
         throw e;
       }
     }
-    if (schemaVersionData != null) {
-      String schemaVersion = asString(schemaVersionData);
-      // only support exact schema matches for now
-      if (!STATE_DB_SCHEMA_VERSION.equals(schemaVersion)) {
-        throw new IOException("Incompatible state database schema, found "
-            + schemaVersion + " expected " + STATE_DB_SCHEMA_VERSION);
-      }
+    checkVersion();
+  }
+  
+  @VisibleForTesting
+  NMDBSchemaVersion loadVersion() throws IOException {
+    byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
+    // if version is not stored previously, treat it as 1.0.
+    if (data == null || data.length == 0) {
+      return NMDBSchemaVersion.newInstance(1, 0);
+    }
+    NMDBSchemaVersion version =
+        new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
+    return version;
+  }
+
+  private void storeSchemaVersion(NMDBSchemaVersion version) throws 
IOException {
+    String key = STATE_DB_SCHEMA_VERSION_KEY;
+    byte[] data = 
+        ((NMDBSchemaVersionPBImpl) version).getProto().toByteArray();
+    try {
+      stateDb.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+  
+  private void storeVersion() throws IOException {
+    storeSchemaVersion(CURRENT_VERSION_INFO);
+  }
+  
+  // Only used for test
+  @VisibleForTesting
+  void storeVersion(NMDBSchemaVersion version) throws IOException {
+    storeSchemaVersion(version);
+  }
+
+  protected NMDBSchemaVersion getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+  
+  /**
+   * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+   * 2) Any incompatible change of DB schema is a major upgrade, and any
+   *    compatible change of DB schema is a minor upgrade.
+   * 3) Within a minor upgrade, say 1.1 to 1.2:
+   *    overwrite the version info and proceed as normal.
+   * 4) Within a major upgrade, say 1.2 to 2.0:
+   *    throw exception and indicate user to use a separate upgrade tool to
+   *    upgrade shuffle info or remove incompatible old state.
+   */
+  private void checkVersion() throws IOException {
+    NMDBSchemaVersion loadedVersion = loadVersion();
+    LOG.info("Loaded state DB schema version info " + loadedVersion);
+    if (loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+      LOG.info("Storing state DB schedma version info " + getCurrentVersion());
+      storeVersion();
     } else {
-      throw new IOException("State database schema version not found");
+      throw new IOException(
+        "Incompatible version for state DB schema: expecting DB schema version 
" 
+            + getCurrentVersion() + ", but loading version " + loadedVersion);
     }
   }
 

Modified: 
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1612941&r1=1612940&r2=1612941&view=diff
==============================================================================
--- 
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
 (original)
+++ 
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
 Wed Jul 23 20:45:10 2014
@@ -67,6 +67,7 @@ import org.apache.hadoop.metrics2.Metric
 import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -718,6 +720,94 @@ public class TestShuffleHandler {
       FileUtil.fullyDelete(tmpDir);
     }
   }
+  
+  @Test
+  public void testRecoveryFromOtherVersions() throws IOException {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(12345, 1);
+    final File tmpDir = new File(System.getProperty("test.build.data",
+        System.getProperty("java.io.tmpdir")),
+        TestShuffleHandler.class.getName());
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+    ShuffleHandler shuffle = new ShuffleHandler();
+    // emulate aux services startup with recovery enabled
+    shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+    tmpDir.mkdirs();
+    try {
+      shuffle.init(conf);
+      shuffle.start();
+
+      // setup a shuffle token for an application
+      DataOutputBuffer outputBuffer = new DataOutputBuffer();
+      outputBuffer.reset();
+      Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
+          "identifier".getBytes(), "password".getBytes(), new Text(user),
+          new Text("shuffleService"));
+      jt.write(outputBuffer);
+      shuffle.initializeApplication(new ApplicationInitializationContext(user,
+          appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+              outputBuffer.getLength())));
+
+      // verify we are authorized to shuffle
+      int rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+      // emulate shuffle handler restart
+      shuffle.close();
+      shuffle = new ShuffleHandler();
+      shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+      shuffle.init(conf);
+      shuffle.start();
+
+      // verify we are still authorized to shuffle to the old application
+      rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+      NMDBSchemaVersion version = NMDBSchemaVersion.newInstance(1, 0);
+      Assert.assertEquals(version, shuffle.getCurrentVersion());
+    
+      // emulate shuffle handler restart with compatible version
+      NMDBSchemaVersion version11 = NMDBSchemaVersion.newInstance(1, 1);
+      // update version info before close shuffle
+      shuffle.storeVersion(version11);
+      Assert.assertEquals(version11, shuffle.loadVersion());
+      shuffle.close();
+      shuffle = new ShuffleHandler();
+      shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+      shuffle.init(conf);
+      shuffle.start();
+      // shuffle version will be override by CURRENT_VERSION_INFO after restart
+      // successfully.
+      Assert.assertEquals(version, shuffle.loadVersion());
+      // verify we are still authorized to shuffle to the old application
+      rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+    
+      // emulate shuffle handler restart with incompatible version
+      NMDBSchemaVersion version21 = NMDBSchemaVersion.newInstance(2, 1);
+      shuffle.storeVersion(version21);
+      Assert.assertEquals(version21, shuffle.loadVersion());
+      shuffle.close();
+      shuffle = new ShuffleHandler();
+      shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+      shuffle.init(conf);
+    
+      try {
+        shuffle.start();
+        Assert.fail("Incompatible version, should expect fail here.");
+      } catch (ServiceStateException e) {
+        Assert.assertTrue("Exception message mismatch", 
+        e.getMessage().contains("Incompatible version for state DB schema:"));
+      } 
+    
+    } finally {
+      if (shuffle != null) {
+        shuffle.close();
+      }
+      FileUtil.fullyDelete(tmpDir);
+    }
+  }
 
   private static int getShuffleResponseCode(ShuffleHandler shuffle,
       Token<JobTokenIdentifier> jt) throws IOException {


Reply via email to