Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java?rev=1077456&r1=1077455&r2=1077456&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java Fri Mar 4 04:16:53 2011 @@ -26,11 +26,18 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; + +import javax.crypto.SecretKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -41,6 +48,11 @@ import org.apache.hadoop.mapred.RecordRe import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskLog; +import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -82,6 +94,18 @@ class Application<K1 extends WritableCom env.put("TMPDIR", System.getProperty("java.io.tmpdir")); env.put("hadoop.pipes.command.port", Integer.toString(serverSocket.getLocalPort())); + + //Add token to the environment if security is enabled + Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf + .getCredentials()); + // This password is used as shared secret key between this application and + // child pipes process + byte[] password = jobToken.getPassword(); + String localPasswordFile = conf.getJobLocalDir() + Path.SEPARATOR + + "jobTokenPassword"; + writePasswordToLocalFile(localPasswordFile, password, conf); + env.put("hadoop.pipes.shared.secret.location", localPasswordFile); + List<String> cmd = new ArrayList<String>(); String interpretor = conf.get("hadoop.pipes.executable.interpretor"); if (interpretor != null) { @@ -107,17 +131,52 @@ class Application<K1 extends WritableCom process = runClient(cmd, env); clientSocket = serverSocket.accept(); - handler = new OutputHandler<K2, V2>(output, reporter, recordReader); + + String challenge = getSecurityChallenge(); + String digestToSend = createDigest(password, challenge); + String digestExpected = createDigest(password, digestToSend); + + handler = new OutputHandler<K2, V2>(output, reporter, recordReader, + digestExpected); K2 outputKey = (K2) ReflectionUtils.newInstance(outputKeyClass, conf); V2 outputValue = (V2) ReflectionUtils.newInstance(outputValueClass, conf); downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler, outputKey, outputValue, conf); + + downlink.authenticate(digestToSend, challenge); + waitForAuthentication(); + LOG.debug("Authentication succeeded"); downlink.start(); downlink.setJobConf(conf); } + private String getSecurityChallenge() { + Random rand = new Random(System.currentTimeMillis()); + //Use 4 random integers so as to have 16 random bytes. + StringBuilder strBuilder = new StringBuilder(); + strBuilder.append(rand.nextInt(0x7fffffff)); + strBuilder.append(rand.nextInt(0x7fffffff)); + strBuilder.append(rand.nextInt(0x7fffffff)); + strBuilder.append(rand.nextInt(0x7fffffff)); + return strBuilder.toString(); + } + + private void writePasswordToLocalFile(String localPasswordFile, + byte[] password, JobConf conf) throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + Path localPath = new Path(localPasswordFile); + if (localFs.isFile(localPath)) { + LOG.debug("Password file is already created by previous path"); + return; + } + FSDataOutputStream out = FileSystem.create(localFs, localPath, + new FsPermission("400")); + out.write(password); + out.close(); + } + /** * Get the downward protocol object that can send commands down to the * application. @@ -126,7 +185,19 @@ class Application<K1 extends WritableCom DownwardProtocol<K1, V1> getDownlink() { return downlink; } - + + /** + * Wait for authentication response. + * @throws IOException + * @throws InterruptedException + */ + void waitForAuthentication() throws IOException, + InterruptedException { + downlink.flush(); + LOG.debug("Waiting for authentication response"); + handler.waitForAuthentication(); + } + /** * Wait for the application to finish * @return did the application finish correctly? @@ -190,5 +261,11 @@ class Application<K1 extends WritableCom Process result = builder.start(); return result; } + + public static String createDigest(byte[] password, String data) + throws IOException { + SecretKey key = JobTokenSecretManager.createSecretKey(password); + return SecureShuffleUtils.hashFromString(data, key); + } }
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?rev=1077456&r1=1077455&r2=1077456&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Fri Mar 4 04:16:53 2011 @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import javax.crypto.SecretKey; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.BytesWritable; @@ -34,6 +36,8 @@ import org.apache.hadoop.io.WritableComp import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.util.StringUtils; /** @@ -69,13 +73,15 @@ class BinaryProtocol<K1 extends Writable REDUCE_VALUE(7), CLOSE(8), ABORT(9), + AUTHENTICATION_REQ(10), OUTPUT(50), PARTITIONED_OUTPUT(51), STATUS(52), PROGRESS(53), DONE(54), REGISTER_COUNTER(55), - INCREMENT_COUNTER(56); + INCREMENT_COUNTER(56), + AUTHENTICATION_RESP(57); final int code; MessageType(int code) { this.code = code; @@ -90,6 +96,7 @@ class BinaryProtocol<K1 extends Writable private UpwardProtocol<K2, V2> handler; private K2 key; private V2 value; + private boolean authPending = true; public UplinkReaderThread(InputStream stream, UpwardProtocol<K2, V2> handler, @@ -113,7 +120,14 @@ class BinaryProtocol<K1 extends Writable } int cmd = WritableUtils.readVInt(inStream); LOG.debug("Handling uplink command " + cmd); - if (cmd == MessageType.OUTPUT.code) { + if (cmd == MessageType.AUTHENTICATION_RESP.code) { + String digest = Text.readString(inStream); + authPending = !handler.authenticate(digest); + } else if (authPending) { + LOG.warn("Message " + cmd + " received before authentication is " + + "complete. Ignoring"); + continue; + } else if (cmd == MessageType.OUTPUT.code) { readObject(key); readObject(value); handler.output(key, value); @@ -244,6 +258,15 @@ class BinaryProtocol<K1 extends Writable uplink.interrupt(); uplink.join(); } + + public void authenticate(String digest, String challenge) + throws IOException { + LOG.debug("Sending AUTHENTICATION_REQ, digest=" + digest + ", challenge=" + + challenge); + WritableUtils.writeVInt(stream, MessageType.AUTHENTICATION_REQ.code); + Text.writeString(stream, digest); + Text.writeString(stream, challenge); + } public void start() throws IOException { LOG.debug("starting downlink"); @@ -344,5 +367,4 @@ class BinaryProtocol<K1 extends Writable stream.write(buffer.getData(), 0, length); } } - } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/DownwardProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/DownwardProtocol.java?rev=1077456&r1=1077455&r2=1077456&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/DownwardProtocol.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/DownwardProtocol.java Fri Mar 4 04:16:53 2011 @@ -32,6 +32,12 @@ import org.apache.hadoop.mapred.JobConf; */ interface DownwardProtocol<K extends WritableComparable, V extends Writable> { /** + * request authentication + * @throws IOException + */ + void authenticate(String digest, String challenge) throws IOException; + + /** * Start communication * @throws IOException */ Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java?rev=1077456&r1=1077455&r2=1077456&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java Fri Mar 4 04:16:53 2011 @@ -44,21 +44,26 @@ class OutputHandler<K extends WritableCo private OutputCollector<K, V> collector; private float progressValue = 0.0f; private boolean done = false; + private Throwable exception = null; RecordReader<FloatWritable,NullWritable> recordReader = null; private Map<Integer, Counters.Counter> registeredCounters = new HashMap<Integer, Counters.Counter>(); + private String expectedDigest = null; + private boolean digestReceived = false; /** * Create a handler that will handle any records output from the application. * @param collector the "real" collector that takes the output * @param reporter the reporter for reporting progress */ public OutputHandler(OutputCollector<K, V> collector, Reporter reporter, - RecordReader<FloatWritable,NullWritable> recordReader) { + RecordReader<FloatWritable,NullWritable> recordReader, + String expectedDigest) { this.reporter = reporter; this.collector = collector; this.recordReader = recordReader; + this.expectedDigest = expectedDigest; } /** @@ -155,5 +160,32 @@ class OutputHandler<K extends WritableCo throw new IOException("Invalid counter with id: " + id); } } + + public synchronized boolean authenticate(String digest) throws IOException { + boolean success = true; + if (!expectedDigest.equals(digest)) { + exception = new IOException("Authentication Failed: Expected digest=" + + expectedDigest + ", received=" + digestReceived); + success = false; + } + digestReceived = true; + notify(); + return success; + } + /** + * This is called by Application and blocks the thread until + * authentication response is received. + * @throws IOException + * @throws InterruptedException + */ + synchronized void waitForAuthentication() + throws IOException, InterruptedException { + while (digestReceived == false && exception == null) { + wait(); + } + if (exception != null) { + throw new IOException(exception.getMessage()); + } + } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java?rev=1077456&r1=1077455&r2=1077456&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java Fri Mar 4 04:16:53 2011 @@ -88,4 +88,14 @@ interface UpwardProtocol<K extends Writa * @throws IOException */ void incrementCounter(int id, long amount) throws IOException; + + /** + * Handles authentication response from client. + * It must notify the threads waiting for authentication response. + * @param digest + * @return true if authentication is successful + * @throws IOException + */ + boolean authenticate(String digest) throws IOException; + }
