Author: tucu Date: Thu Jul 26 13:23:05 2012 New Revision: 1365979 URL: http://svn.apache.org/viewvc?rev=1365979&view=rev Log: MAPREDUCE-4417. add support for encrypted shuffle (tucu)
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1365979&r1=1365978&r2=1365979&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Jul 26 13:23:05 2012 @@ -135,6 +135,8 @@ Branch-2 ( Unreleased changes ) MAPREDUCE-987. Exposing MiniDFS and MiniMR clusters as a single process command-line. (ahmed via tucu) + MAPREDUCE-4417. add support for encrypted shuffle (tucu) + IMPROVEMENTS MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved Modified: hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1365979&r1=1365978&r2=1365979&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Thu Jul 26 13:23:05 2012 @@ -473,5 +473,10 @@ <!-- The above 2 fields are accessed locally and only via methods that are synchronized. --> - + + <Match> + <Class name="org.apache.hadoop.mapred.ShuffleHandler" /> + <Field name="sslFileBufferSize" /> + <Bug pattern="IS2_INCONSISTENT_SYNC" /> + </Match> </FindBugsFilter> Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1365979&r1=1365978&r2=1365979&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Jul 26 13:23:05 2012 @@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TypeConverter; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.jobhi import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; +import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; @@ -108,7 +110,8 @@ public abstract class TaskImpl implement private long scheduledTime; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - + + protected boolean encryptedShuffle; protected Credentials credentials; protected Token<JobTokenIdentifier> jobToken; @@ -274,6 +277,8 @@ public abstract class TaskImpl implement this.jobToken = jobToken; this.metrics = metrics; this.appContext = appContext; + this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, + MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT); // See if this is from a previous generation. if (completedTasksFromPreviousRun != null @@ -637,9 +642,10 @@ public abstract class TaskImpl implement TaskAttemptCompletionEvent tce = recordFactory .newRecordInstance(TaskAttemptCompletionEvent.class); tce.setEventId(-1); - tce.setMapOutputServerAddress("http://" - + attempt.getNodeHttpAddress().split(":")[0] + ":" - + attempt.getShufflePort()); + String scheme = (encryptedShuffle) ? "https://" : "http://"; + tce.setMapOutputServerAddress(scheme + + attempt.getNodeHttpAddress().split(":")[0] + ":" + + attempt.getShufflePort()); tce.setStatus(status); tce.setAttemptId(attempt.getID()); int runTime = 0; Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1365979&r1=1365978&r2=1365979&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Thu Jul 26 13:23:05 2012 @@ -79,4 +79,9 @@ public interface MRConfig { public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10; public static final String MAX_BLOCK_LOCATIONS_KEY = "mapreduce.job.max.split.locations"; + + public static final String SHUFFLE_SSL_ENABLED_KEY = + "mapreduce.shuffle.ssl.enabled"; + + public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false; } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1365979&r1=1365978&r2=1365979&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu Jul 26 13:23:05 2012 @@ -25,11 +25,13 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.HttpURLConnection; import java.net.URLConnection; +import java.security.GeneralSecurityException; import java.util.HashSet; import java.util.List; import java.util.Set; import javax.crypto.SecretKey; +import javax.net.ssl.HttpsURLConnection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,9 +44,11 @@ import org.apache.hadoop.mapred.Counters import org.apache.hadoop.mapred.IFileInputStream; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; @@ -92,6 +96,9 @@ class Fetcher<K,V> extends Thread { private volatile boolean stopped = false; + private static boolean sslShuffle; + private static SSLFactory sslFactory; + public Fetcher(JobConf job, TaskAttemptID reduceId, ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger, Reporter reporter, ShuffleClientMetrics metrics, @@ -135,6 +142,20 @@ class Fetcher<K,V> extends Thread { setName("fetcher#" + id); setDaemon(true); + + synchronized (Fetcher.class) { + sslShuffle = job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, + MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT); + if (sslShuffle && sslFactory == null) { + sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job); + try { + sslFactory.init(); + } catch (Exception ex) { + sslFactory.destroy(); + throw new RuntimeException(ex); + } + } + } } public void run() { @@ -173,8 +194,25 @@ class Fetcher<K,V> extends Thread { } catch (InterruptedException ie) { LOG.warn("Got interrupt while joining " + getName(), ie); } + if (sslFactory != null) { + sslFactory.destroy(); + } } + protected HttpURLConnection openConnection(URL url) throws IOException { + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + if (sslShuffle) { + HttpsURLConnection httpsConn = (HttpsURLConnection) conn; + try { + httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory()); + } catch (GeneralSecurityException ex) { + throw new IOException(ex); + } + httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier()); + } + return conn; + } + /** * The crux of the matter... * @@ -205,7 +243,7 @@ class Fetcher<K,V> extends Thread { try { URL url = getMapOutputURL(host, maps); - HttpURLConnection connection = (HttpURLConnection)url.openConnection(); + HttpURLConnection connection = openConnection(url); // generate hash of the url String msgToEncode = SecureShuffleUtils.buildMsgFrom(url); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1365979&r1=1365978&r2=1365979&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Jul 26 13:23:05 2012 @@ -513,6 +513,21 @@ </property> <property> + <name>mapreduce.shuffle.ssl.enabled</name> + <value>false</value> + <description> + Whether to use SSL for for the Shuffle HTTP endpoints. + </description> +</property> + +<property> + <name>mapreduce.shuffle.ssl.file.buffer.size</name> + <value>65536</value> + <description>Buffer size for reading spills from file when using SSL. + </description> +</property> + +<property> <name>mapreduce.reduce.markreset.buffer.percent</name> <value>0.0</value> <description>The percentage of memory -relative to the maximum heap size- to Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java?rev=1365979&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java Thu Jul 26 13:23:05 2012 @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.security.ssl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRClientCluster; +import org.apache.hadoop.mapred.MiniMRClientClusterFactory; +import org.apache.hadoop.mapred.RunningJob; + +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.security.ssl.KeyStoreTestUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.Assert; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.net.URL; + +public class TestEncryptedShuffle { + + private static final String BASEDIR = + System.getProperty("test.build.dir", "target/test-dir") + "/" + + TestEncryptedShuffle.class.getSimpleName(); + + @BeforeClass + public static void setUp() throws Exception { + File base = new File(BASEDIR); + FileUtil.fullyDelete(base); + base.mkdirs(); + } + + @Before + public void createCustomYarnClasspath() throws Exception { + String classpathDir = + KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class); + + URL url = Thread.currentThread().getContextClassLoader(). + getResource("mrapp-generated-classpath"); + File f = new File(url.getPath()); + BufferedReader reader = new BufferedReader(new FileReader(f)); + String cp = reader.readLine(); + cp = cp + ":" + classpathDir; + f = new File(classpathDir, "mrapp-generated-classpath"); + Writer writer = new FileWriter(f); + writer.write(cp); + writer.close(); + new File(classpathDir, "core-site.xml").delete(); + } + + @After + public void cleanUpMiniClusterSpecialConfig() throws Exception { + String classpathDir = + KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class); + new File(classpathDir, "mrapp-generated-classpath").delete(); + new File(classpathDir, "core-site.xml").delete(); + String keystoresDir = new File(BASEDIR).getAbsolutePath(); + KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, classpathDir); + } + + private MiniDFSCluster dfsCluster = null; + private MiniMRClientCluster mrCluster = null; + + private void startCluster(Configuration conf) throws Exception { + if (System.getProperty("hadoop.log.dir") == null) { + System.setProperty("hadoop.log.dir", "target/test-dir"); + } + conf.set("dfs.block.access.token.enable", "false"); + conf.set("dfs.permissions", "true"); + conf.set("hadoop.security.authentication", "simple"); + dfsCluster = new MiniDFSCluster(conf, 1, true, null); + FileSystem fileSystem = dfsCluster.getFileSystem(); + fileSystem.mkdirs(new Path("/tmp")); + fileSystem.mkdirs(new Path("/user")); + fileSystem.mkdirs(new Path("/hadoop/mapred/system")); + fileSystem.setPermission( + new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission( + new Path("/user"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission( + new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------")); + FileSystem.setDefaultUri(conf, fileSystem.getUri()); + mrCluster = MiniMRClientClusterFactory.create(this.getClass(), 1, conf); + + // so the minicluster conf is avail to the containers. + String classpathDir = + KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class); + Writer writer = new FileWriter(classpathDir + "/core-site.xml"); + mrCluster.getConfig().writeXml(writer); + writer.close(); + } + + private void stopCluster() throws Exception { + if (mrCluster != null) { + mrCluster.stop(); + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + } + + protected JobConf getJobConf() throws IOException { + return new JobConf(mrCluster.getConfig()); + } + + private void encryptedShuffleWithCerts(boolean useClientCerts) + throws Exception { + try { + Configuration conf = new Configuration(); + String keystoresDir = new File(BASEDIR).getAbsolutePath(); + String sslConfsDir = + KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class); + KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfsDir, conf, + useClientCerts); + conf.setBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, true); + startCluster(conf); + FileSystem fs = FileSystem.get(getJobConf()); + Path inputDir = new Path("input"); + fs.mkdirs(inputDir); + Writer writer = + new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + writer.write("hello"); + writer.close(); + + Path outputDir = new Path("output", "output"); + + JobConf jobConf = new JobConf(getJobConf()); + jobConf.setInt("mapred.map.tasks", 1); + jobConf.setInt("mapred.map.max.attempts", 1); + jobConf.setInt("mapred.reduce.max.attempts", 1); + jobConf.set("mapred.input.dir", inputDir.toString()); + jobConf.set("mapred.output.dir", outputDir.toString()); + JobClient jobClient = new JobClient(jobConf); + RunningJob runJob = jobClient.submitJob(jobConf); + runJob.waitForCompletion(); + Assert.assertTrue(runJob.isComplete()); + Assert.assertTrue(runJob.isSuccessful()); + } finally { + stopCluster(); + } + } + + @Test + public void encryptedShuffleWithClientCerts() throws Exception { + encryptedShuffleWithCerts(true); + } + + @Test + public void encryptedShuffleWithoutClientCerts() throws Exception { + encryptedShuffleWithCerts(false); + } + +} + Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1365979&r1=1365978&r2=1365979&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Thu Jul 26 13:23:05 2012 @@ -55,7 +55,9 @@ import org.apache.hadoop.fs.LocalDirAllo import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; @@ -101,6 +103,8 @@ import org.jboss.netty.handler.codec.htt import org.jboss.netty.handler.codec.http.HttpResponseEncoder; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.QueryStringDecoder; +import org.jboss.netty.handler.ssl.SslHandler; +import org.jboss.netty.handler.stream.ChunkedFile; import org.jboss.netty.handler.stream.ChunkedWriteHandler; import org.jboss.netty.util.CharsetUtil; @@ -114,6 +118,8 @@ public class ShuffleHandler extends Abst private int port; private ChannelFactory selector; private final ChannelGroup accepted = new DefaultChannelGroup(); + private HttpPipelineFactory pipelineFact; + private int sslFileBufferSize; public static final String MAPREDUCE_SHUFFLE_SERVICEID = "mapreduce.shuffle"; @@ -126,6 +132,11 @@ public class ShuffleHandler extends Abst public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port"; public static final int DEFAULT_SHUFFLE_PORT = 8080; + public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = + "mapreduce.shuffle.ssl.file.buffer.size"; + + public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; + @Metrics(about="Shuffle output metrics", context="mapred") static class ShuffleMetrics implements ChannelFutureListener { @Metric("Shuffle output in bytes") @@ -249,7 +260,11 @@ public class ShuffleHandler extends Abst public synchronized void start() { Configuration conf = getConfig(); ServerBootstrap bootstrap = new ServerBootstrap(selector); - HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf); + try { + pipelineFact = new HttpPipelineFactory(conf); + } catch (Exception ex) { + throw new RuntimeException(ex); + } bootstrap.setPipelineFactory(pipelineFact); port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); Channel ch = bootstrap.bind(new InetSocketAddress(port)); @@ -259,6 +274,9 @@ public class ShuffleHandler extends Abst pipelineFact.SHUFFLE.setPort(port); LOG.info(getName() + " listening on port " + port); super.start(); + + sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, + DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); } @Override @@ -266,6 +284,7 @@ public class ShuffleHandler extends Abst accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS); ServerBootstrap bootstrap = new ServerBootstrap(selector); bootstrap.releaseExternalResources(); + pipelineFact.destroy(); super.stop(); } @@ -283,22 +302,38 @@ public class ShuffleHandler extends Abst class HttpPipelineFactory implements ChannelPipelineFactory { final Shuffle SHUFFLE; + private SSLFactory sslFactory; - public HttpPipelineFactory(Configuration conf) { + public HttpPipelineFactory(Configuration conf) throws Exception { SHUFFLE = new Shuffle(conf); + if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, + MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) { + sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); + sslFactory.init(); + } + } + + public void destroy() { + if (sslFactory != null) { + sslFactory.destroy(); + } } @Override public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline( - new HttpRequestDecoder(), - new HttpChunkAggregator(1 << 16), - new HttpResponseEncoder(), - new ChunkedWriteHandler(), - SHUFFLE); - // TODO factor security manager into pipeline - // TODO factor out encode/decode to permit binary shuffle - // TODO factor out decode of index to permit alt. models + ChannelPipeline pipeline = Channels.pipeline(); + if (sslFactory != null) { + pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); + } + pipeline.addLast("decoder", new HttpRequestDecoder()); + pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16)); + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("chunking", new ChunkedWriteHandler()); + pipeline.addLast("shuffle", SHUFFLE); + return pipeline; + // TODO factor security manager into pipeline + // TODO factor out encode/decode to permit binary shuffle + // TODO factor out decode of index to permit alt. models } } @@ -483,17 +518,25 @@ public class ShuffleHandler extends Abst LOG.info(spillfile + " not found"); return null; } - final FileRegion partition = new DefaultFileRegion( - spill.getChannel(), info.startOffset, info.partLength); - ChannelFuture writeFuture = ch.write(partition); - writeFuture.addListener(new ChannelFutureListener() { - // TODO error handling; distinguish IO/connection failures, - // attribute to appropriate spill output - @Override - public void operationComplete(ChannelFuture future) { - partition.releaseExternalResources(); - } - }); + ChannelFuture writeFuture; + if (ch.getPipeline().get(SslHandler.class) == null) { + final FileRegion partition = new DefaultFileRegion( + spill.getChannel(), info.startOffset, info.partLength); + writeFuture = ch.write(partition); + writeFuture.addListener(new ChannelFutureListener() { + // TODO error handling; distinguish IO/connection failures, + // attribute to appropriate spill output + @Override + public void operationComplete(ChannelFuture future) { + partition.releaseExternalResources(); + } + }); + } else { + // HTTPS cannot be done with zero copy. + writeFuture = ch.write(new ChunkedFile(spill, info.startOffset, + info.partLength, + sslFileBufferSize)); + } metrics.shuffleConnections.incr(); metrics.shuffleOutputBytes.incr(info.partLength); // optimistic return writeFuture; Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm?rev=1365979&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm Thu Jul 26 13:23:05 2012 @@ -0,0 +1,320 @@ +~~ Licensed under the Apache License, Version 2.0 (the "License"); +~~ you may not use this file except in compliance with the License. +~~ You may obtain a copy of the License at +~~ +~~ http://www.apache.org/licenses/LICENSE-2.0 +~~ +~~ Unless required by applicable law or agreed to in writing, software +~~ distributed under the License is distributed on an "AS IS" BASIS, +~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~~ See the License for the specific language governing permissions and +~~ limitations under the License. See accompanying LICENSE file. + + --- + Hadoop Map Reduce Next Generation-${project.version} - Encrypted Shuffle + --- + --- + ${maven.build.timestamp} + +Hadoop MapReduce Next Generation - Encrypted Shuffle + + \[ {{{./index.html}Go Back}} \] + +* {Introduction} + + The Encrypted Shuffle capability allows encryption of the MapReduce shuffle + using HTTPS and with optional client authentication (also known as + bi-directional HTTPS, or HTTPS with client certificates). It comprises: + + * A Hadoop configuration setting for toggling the shuffle between HTTP and + HTTPS. + + * A Hadoop configuration settings for specifying the keystore and truststore + properties (location, type, passwords) used by the shuffle service and the + reducers tasks fetching shuffle data. + + * A way to re-load truststores across the cluster (when a node is added or + removed). + +* {Configuration} + +** <<core-site.xml>> Properties + + To enable encrypted shuffle, set the following properties in core-site.xml of + all nodes in the cluster: + +*--------------------------------------+---------------------+-----------------+ +| <<Property>> | <<Default Value>> | <<Explanation>> | +*--------------------------------------+---------------------+-----------------+ +| <<<hadoop.ssl.require.client.cert>>> | <<<false>>> | Whether client certificates are required | +*--------------------------------------+---------------------+-----------------+ +| <<<hadoop.ssl.hostname.verifier>>> | <<<DEFAULT>>> | The hostname verifier to provide for HttpsURLConnections. Valid values are: <<DEFAULT>>, <<STRICT>>, <<STRICT_I6>>, <<DEFAULT_AND_LOCALHOST>> and <<ALLOW_ALL>> | +*--------------------------------------+---------------------+-----------------+ +| <<<hadoop.ssl.keystores.factory.class>>> | <<<org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory>>> | The KeyStoresFactory implementation to use | +*--------------------------------------+---------------------+-----------------+ +| <<<hadoop.ssl.server.conf>>> | <<<ss-server.xml>>> | Resource file from which ssl server keystore information will be extracted. This file is looked up in the classpath, typically it should be in Hadoop conf/ directory | +*--------------------------------------+---------------------+-----------------+ +| <<<hadoop.ssl.client.conf>>> | <<<ss-client.xml>>> | Resource file from which ssl server keystore information will be extracted. This file is looked up in the classpath, typically it should be in Hadoop conf/ directory | +*--------------------------------------+---------------------+-----------------+ + + <<IMPORTANT:>> Currently requiring client certificates should be set to false. + Refer the {{{ClientCertificates}Client Certificates}} section for details. + + <<IMPORTANT:>> All these properties should be marked as final in the cluster + configuration files. + +*** Example: + +------ + ... + <property> + <name>hadoop.ssl.require.client.cert</name> + <value>false</value> + <final>true</final> + </property> + + <property> + <name>hadoop.ssl.hostname.verifier</name> + <value>DEFAULT</value> + <final>true</final> + </property> + + <property> + <name>hadoop.ssl.keystores.factory.class</name> + <value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value> + <final>true</final> + </property> + + <property> + <name>hadoop.ssl.server.conf</name> + <value>ssl-server.xml</value> + <final>true</final> + </property> + + <property> + <name>hadoop.ssl.client.conf</name> + <value>ssl-client.xml</value> + <final>true</final> + </property> + ... +------ + +** <<<mapred-site.xml>>> Properties + + To enable encrypted shuffle, set the following property in mapred-site.xml + of all nodes in the cluster: + +*--------------------------------------+---------------------+-----------------+ +| <<Property>> | <<Default Value>> | <<Explanation>> | +*--------------------------------------+---------------------+-----------------+ +| <<<mapreduce.shuffle.ssl.enabled>>> | <<<false>>> | Whether encrypted shuffle is enabled | +*--------------------------------------+---------------------+-----------------+ + + <<IMPORTANT:>> This property should be marked as final in the cluster + configuration files. + +*** Example: + +------ + ... + <property> + <name>mapreduce.shuffle.ssl.enabled</name> + <value>true</value> + <final>true</final> + </property> + ... +------ + + The Linux container executor should be set to prevent job tasks from + reading the server keystore information and gaining access to the shuffle + server certificates. + + Refer to Hadoop Kerberos configuration for details on how to do this. + +* {Keystore and Truststore Settings} + + Currently <<<FileBasedKeyStoresFactory>>> is the only <<<KeyStoresFactory>>> + implementation. The <<<FileBasedKeyStoresFactory>>> implementation uses the + following properties, in the <<ssl-server.xml>> and <<ssl-client.xml>> files, + to configure the keystores and truststores. + +** <<<ssl-server.xml>>> (Shuffle server) Configuration: + + The mapred user should own the <<ssl-server.xml>> file and have exclusive + read access to it. + +*---------------------------------------------+---------------------+-----------------+ +| <<Property>> | <<Default Value>> | <<Explanation>> | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.server.keystore.type>>> | <<<jks>>> | Keystore file type | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.server.keystore.location>>> | NONE | Keystore file location. The mapred user should own this file and have exclusive read access to it. | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.server.keystore.password>>> | NONE | Keystore file password | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.server.truststore.type>>> | <<<jks>>> | Truststore file type | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.server.truststore.location>>> | NONE | Truststore file location. The mapred user should own this file and have exclusive read access to it. | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.server.truststore.password>>> | NONE | Truststore file password | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.server.truststore.reload.interval>>> | 10000 | Truststore reload interval, in milliseconds | +*--------------------------------------+----------------------------+-----------------+ + +*** Example: + +------ +<configuration> + + <!-- Server Certificate Store --> + <property> + <name>ssl.server.keystore.type</name> + <value>jks</value> + </property> + <property> + <name>ssl.server.keystore.location</name> + <value>${user.home}/keystores/server-keystore.jks</value> + </property> + <property> + <name>ssl.server.keystore.password</name> + <value>serverfoo</value> + </property> + + <!-- Server Trust Store --> + <property> + <name>ssl.server.truststore.type</name> + <value>jks</value> + </property> + <property> + <name>ssl.server.truststore.location</name> + <value>${user.home}/keystores/truststore.jks</value> + </property> + <property> + <name>ssl.server.truststore.password</name> + <value>clientserverbar</value> + </property> + <property> + <name>ssl.server.truststore.reload.interval</name> + <value>10000</value> + </property> +</configuration> +------ + +** <<<ssl-client.xml>>> (Reducer/Fetcher) Configuration: + + The mapred user should own the <<ssl-server.xml>> file and it should have + default permissions. + +*---------------------------------------------+---------------------+-----------------+ +| <<Property>> | <<Default Value>> | <<Explanation>> | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.client.keystore.type>>> | <<<jks>>> | Keystore file type | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.client.keystore.location>>> | NONE | Keystore file location. The mapred user should own this file and it should have default permissions. | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.client.keystore.password>>> | NONE | Keystore file password | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.client.truststore.type>>> | <<<jks>>> | Truststore file type | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.client.truststore.location>>> | NONE | Truststore file location. The mapred user should own this file and it should have default permissions. | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.client.truststore.password>>> | NONE | Truststore file password | +*---------------------------------------------+---------------------+-----------------+ +| <<<ssl.client.truststore.reload.interval>>> | 10000 | Truststore reload interval, in milliseconds | +*--------------------------------------+----------------------------+-----------------+ + +*** Example: + +------ +<configuration> + + <!-- Client certificate Store --> + <property> + <name>ssl.client.keystore.type</name> + <value>jks</value> + </property> + <property> + <name>ssl.client.keystore.location</name> + <value>${user.home}/keystores/client-keystore.jks</value> + </property> + <property> + <name>ssl.client.keystore.password</name> + <value>clientfoo</value> + </property> + + <!-- Client Trust Store --> + <property> + <name>ssl.client.truststore.type</name> + <value>jks</value> + </property> + <property> + <name>ssl.client.truststore.location</name> + <value>${user.home}/keystores/truststore.jks</value> + </property> + <property> + <name>ssl.client.truststore.password</name> + <value>clientserverbar</value> + </property> + <property> + <name>ssl.client.truststore.reload.interval</name> + <value>10000</value> + </property> +</configuration> +------ + +* Activating Encrypted Shuffle + + When you have made the above configuration changes, activate Encrypted + Shuffle by re-starting all NodeManagers. + + <<IMPORTANT:>> Using encrypted shuffle will incur in a significant + performance impact. Users should profile this and potentially reserve + 1 or more cores for encrypted shuffle. + +* {ClientCertificates} Client Certificates + + Using Client Certificates does not fully ensure that the client is a + reducer task for the job. Currently, Client Certificates (their private key) + keystore files must be readable by all users submitting jobs to the cluster. + This means that a rogue job could read such those keystore files and use + the client certificates in them to establish a secure connection with a + Shuffle server. However, unless the rogue job has a proper JobToken, it won't + be able to retrieve shuffle data from the Shuffle server. A job, using its + own JobToken, can only retrieve shuffle data that belongs to itself. + +* Reloading Truststores + + By default the truststores will reload their configuration every 10 seconds. + If a new truststore file is copied over the old one, it will be re-read, + and its certificates will replace the old ones. This mechanism is useful for + adding or removing nodes from the cluster, or for adding or removing trusted + clients. In these cases, the client or NodeManager certificate is added to + (or removed from) all the truststore files in the system, and the new + configuration will be picked up without you having to restart the NodeManager + daemons. + +* Debugging + + <<NOTE:>> Enable debugging only for troubleshooting, and then only for jobs + running on small amounts of data. It is very verbose and slows down jobs by + several orders of magnitude. (You might need to increase mapred.task.timeout + to prevent jobs from failing because tasks run so slowly.) + + To enable SSL debugging in the reducers, set <<<-Djavax.net.debug=all>>> in + the <<<mapreduce.reduce.child.java.opts>>> property; for example: + +------ + <property> + <name>mapred.reduce.child.java.opts</name> + <value>-Xmx-200m -Djavax.net.debug=all</value> + </property> +------ + + You can do this on a per-job basis, or by means of a cluster-wide setting in + the <<<mapred-site.xml>>> file. + + To set this property in NodeManager, set it in the <<<yarn-env.sh>>> file: + +------ + YARN_NODEMANAGER_OPTS="-Djavax.net.debug=all $YARN_NODEMANAGER_OPTS" +------ Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm?rev=1365979&r1=1365978&r2=1365979&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm Thu Jul 26 13:23:05 2012 @@ -51,3 +51,5 @@ MapReduce NextGen aka YARN aka MRv2 * {{{./CLIMiniCluster.html}CLI MiniCluster}} + * {{{./EncryptedShuffle.html}Encrypted Shuffle}} +