Author: wang Date: Wed Jul 23 20:45:10 2014 New Revision: 1612941 URL: http://svn.apache.org/r1612941 Log: Merge from trunk to branch pt 2
Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1612403-1612915 Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt?rev=1612941&r1=1612940&r2=1612941&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt Wed Jul 23 20:45:10 2014 @@ -159,6 +159,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5971. Move the default options for distcp -p to DistCpOptionSwitch. (clamb via wang) + MAPREDUCE-5963. ShuffleHandler DB schema should be versioned with + compatible/incompatible changes (Junping Du via jlowe) + OPTIMIZATIONS BUG FIXES Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1612403-1612915 Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1612941&r1=1612940&r2=1612941&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Wed Jul 23 20:45:10 2014 @@ -82,10 +82,13 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.util.ConverterUtils; import org.fusesource.leveldbjni.JniDBFactory; @@ -125,6 +128,7 @@ import org.jboss.netty.handler.stream.Ch import org.jboss.netty.util.CharsetUtil; import org.mortbay.jetty.HttpHeaders; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; @@ -146,8 +150,9 @@ public class ShuffleHandler extends Auxi Pattern.CASE_INSENSITIVE); private static final String STATE_DB_NAME = "mapreduce_shuffle_state"; - private static final String STATE_DB_SCHEMA_VERSION_KEY = "schema-version"; - private static final String STATE_DB_SCHEMA_VERSION = "1.0"; + private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version"; + protected static final NMDBSchemaVersion CURRENT_VERSION_INFO = + NMDBSchemaVersion.newInstance(1, 0); private int port; private ChannelFactory selector; @@ -466,18 +471,15 @@ public class ShuffleHandler extends Auxi Path dbPath = new Path(recoveryRoot, STATE_DB_NAME); LOG.info("Using state database at " + dbPath + " for recovery"); File dbfile = new File(dbPath.toString()); - byte[] schemaVersionData; try { stateDb = JniDBFactory.factory.open(dbfile, options); - schemaVersionData = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY)); } catch (NativeDB.DBException e) { if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { LOG.info("Creating state database at " + dbfile); options.createIfMissing(true); try { stateDb = JniDBFactory.factory.open(dbfile, options); - schemaVersionData = bytes(STATE_DB_SCHEMA_VERSION); - stateDb.put(bytes(STATE_DB_SCHEMA_VERSION_KEY), schemaVersionData); + storeVersion(); } catch (DBException dbExc) { throw new IOException("Unable to create state store", dbExc); } @@ -485,15 +487,69 @@ public class ShuffleHandler extends Auxi throw e; } } - if (schemaVersionData != null) { - String schemaVersion = asString(schemaVersionData); - // only support exact schema matches for now - if (!STATE_DB_SCHEMA_VERSION.equals(schemaVersion)) { - throw new IOException("Incompatible state database schema, found " - + schemaVersion + " expected " + STATE_DB_SCHEMA_VERSION); - } + checkVersion(); + } + + @VisibleForTesting + NMDBSchemaVersion loadVersion() throws IOException { + byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY)); + // if version is not stored previously, treat it as 1.0. + if (data == null || data.length == 0) { + return NMDBSchemaVersion.newInstance(1, 0); + } + NMDBSchemaVersion version = + new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data)); + return version; + } + + private void storeSchemaVersion(NMDBSchemaVersion version) throws IOException { + String key = STATE_DB_SCHEMA_VERSION_KEY; + byte[] data = + ((NMDBSchemaVersionPBImpl) version).getProto().toByteArray(); + try { + stateDb.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } + } + + private void storeVersion() throws IOException { + storeSchemaVersion(CURRENT_VERSION_INFO); + } + + // Only used for test + @VisibleForTesting + void storeVersion(NMDBSchemaVersion version) throws IOException { + storeSchemaVersion(version); + } + + protected NMDBSchemaVersion getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + /** + * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. + * 2) Any incompatible change of DB schema is a major upgrade, and any + * compatible change of DB schema is a minor upgrade. + * 3) Within a minor upgrade, say 1.1 to 1.2: + * overwrite the version info and proceed as normal. + * 4) Within a major upgrade, say 1.2 to 2.0: + * throw exception and indicate user to use a separate upgrade tool to + * upgrade shuffle info or remove incompatible old state. + */ + private void checkVersion() throws IOException { + NMDBSchemaVersion loadedVersion = loadVersion(); + LOG.info("Loaded state DB schema version info " + loadedVersion); + if (loadedVersion.equals(getCurrentVersion())) { + return; + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing state DB schedma version info " + getCurrentVersion()); + storeVersion(); } else { - throw new IOException("State database schema version not found"); + throw new IOException( + "Incompatible version for state DB schema: expecting DB schema version " + + getCurrentVersion() + ", but loading version " + loadedVersion); } } Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1612941&r1=1612940&r2=1612941&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Wed Jul 23 20:45:10 2014 @@ -67,6 +67,7 @@ import org.apache.hadoop.metrics2.Metric import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.ServiceStateException; import org.apache.hadoop.util.PureJavaCrc32; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; @@ -718,6 +720,94 @@ public class TestShuffleHandler { FileUtil.fullyDelete(tmpDir); } } + + @Test + public void testRecoveryFromOtherVersions() throws IOException { + final String user = "someuser"; + final ApplicationId appId = ApplicationId.newInstance(12345, 1); + final File tmpDir = new File(System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestShuffleHandler.class.getName()); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + ShuffleHandler shuffle = new ShuffleHandler(); + // emulate aux services startup with recovery enabled + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + tmpDir.mkdirs(); + try { + shuffle.init(conf); + shuffle.start(); + + // setup a shuffle token for an application + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>( + "identifier".getBytes(), "password".getBytes(), new Text(user), + new Text("shuffleService")); + jt.write(outputBuffer); + shuffle.initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + + // verify we are authorized to shuffle + int rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + // emulate shuffle handler restart + shuffle.close(); + shuffle = new ShuffleHandler(); + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + shuffle.init(conf); + shuffle.start(); + + // verify we are still authorized to shuffle to the old application + rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + NMDBSchemaVersion version = NMDBSchemaVersion.newInstance(1, 0); + Assert.assertEquals(version, shuffle.getCurrentVersion()); + + // emulate shuffle handler restart with compatible version + NMDBSchemaVersion version11 = NMDBSchemaVersion.newInstance(1, 1); + // update version info before close shuffle + shuffle.storeVersion(version11); + Assert.assertEquals(version11, shuffle.loadVersion()); + shuffle.close(); + shuffle = new ShuffleHandler(); + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + shuffle.init(conf); + shuffle.start(); + // shuffle version will be override by CURRENT_VERSION_INFO after restart + // successfully. + Assert.assertEquals(version, shuffle.loadVersion()); + // verify we are still authorized to shuffle to the old application + rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + // emulate shuffle handler restart with incompatible version + NMDBSchemaVersion version21 = NMDBSchemaVersion.newInstance(2, 1); + shuffle.storeVersion(version21); + Assert.assertEquals(version21, shuffle.loadVersion()); + shuffle.close(); + shuffle = new ShuffleHandler(); + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + shuffle.init(conf); + + try { + shuffle.start(); + Assert.fail("Incompatible version, should expect fail here."); + } catch (ServiceStateException e) { + Assert.assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for state DB schema:")); + } + + } finally { + if (shuffle != null) { + shuffle.close(); + } + FileUtil.fullyDelete(tmpDir); + } + } private static int getShuffleResponseCode(ShuffleHandler shuffle, Token<JobTokenIdentifier> jt) throws IOException {