Repository: hadoop Updated Branches: refs/heads/branch-2.8 2d4629f38 -> 9d3ea2c0e
YARN-7244. ShuffleHandler is not aware of disks that are added. Contributed by Kuhu Shukla Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9d3ea2c0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9d3ea2c0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9d3ea2c0 Branch: refs/heads/branch-2.8 Commit: 9d3ea2c0ed217b31c06bb136b9ebf9a88fc38c11 Parents: 2d4629f Author: Jason Lowe <jl...@apache.org> Authored: Mon Oct 30 16:08:14 2017 -0500 Committer: Jason Lowe <jl...@apache.org> Committed: Mon Oct 30 16:08:14 2017 -0500 ---------------------------------------------------------------------- .../apache/hadoop/mapred/ShuffleHandler.java | 14 ++- .../hadoop/mapred/TestShuffleHandler.java | 100 ++++++++++++++----- .../server/api/AuxiliaryLocalPathHandler.java | 58 +++++++++++ .../yarn/server/api/AuxiliaryService.java | 21 ++++ .../nodemanager/LocalDirsHandlerService.java | 4 + .../containermanager/AuxServices.java | 6 +- .../containermanager/ContainerManagerImpl.java | 35 ++++++- .../containermanager/TestAuxServices.java | 21 ++-- .../containermanager/TestContainerManager.java | 51 +++++++++- 9 files changed, 261 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3ea2c0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 4b367c1..c81f25b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -57,7 +57,6 @@ import javax.crypto.SecretKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -83,7 +82,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; @@ -838,8 +836,6 @@ public class ShuffleHandler extends AuxiliaryService { private final Configuration conf; private final IndexCache indexCache; - private final LocalDirAllocator lDirAlloc = - new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); private int port; public Shuffle(Configuration conf) { @@ -1064,13 +1060,14 @@ public class ShuffleHandler extends AuxiliaryService { protected MapOutputInfo getMapOutputInfo(String base, String mapId, int reduce, String user) throws IOException { // Index file - Path indexFileName = - lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf); + Path indexFileName = getAuxiliaryLocalPathHandler() + .getLocalPathForRead(base + "/file.out.index"); IndexRecord info = indexCache.getIndexInformation(mapId, reduce, indexFileName, user); Path mapOutputFileName = - lDirAlloc.getLocalPathToRead(base + "/file.out", conf); + getAuxiliaryLocalPathHandler().getLocalPathForRead(base + + "/file.out"); if (LOG.isDebugEnabled()) { LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName); } @@ -1092,7 +1089,8 @@ public class ShuffleHandler extends AuxiliaryService { } // Index file Path indexFileName = - lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf); + getAuxiliaryLocalPathHandler().getLocalPathForRead( + base + "/file.out.index"); IndexRecord info = indexCache.getIndexInformation(mapId, reduce, indexFileName, user); ShuffleHeader header = http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3ea2c0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index ade1df5..c9e40d1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -70,12 +70,15 @@ import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.PureJavaCrc32; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.records.Version; import org.jboss.netty.channel.Channel; @@ -92,6 +95,9 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpMethod; import org.junit.Assert; import org.junit.Test; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.mockito.Mockito; @@ -100,8 +106,12 @@ import org.mortbay.jetty.HttpHeaders; public class TestShuffleHandler { static final long MiB = 1024 * 1024; private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class); + private static final File ABS_LOG_DIR = GenericTestUtils.getTestDir( + TestShuffleHandler.class.getSimpleName() + "LocDir"); class MockShuffleHandler extends org.apache.hadoop.mapred.ShuffleHandler { + private AuxiliaryLocalPathHandler pathHandler = + new TestAuxiliaryLocalPathHandler(); @Override protected Shuffle getShuffle(final Configuration conf) { return new Shuffle(conf) { @@ -141,12 +151,36 @@ public class TestShuffleHandler { } }; } + + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } } - private static class MockShuffleHandler2 extends org.apache.hadoop.mapred.ShuffleHandler { - boolean socketKeepAlive = false; + private class TestAuxiliaryLocalPathHandler + implements AuxiliaryLocalPathHandler { + @Override + public Path getLocalPathForRead(String path) throws IOException { + return new Path(ABS_LOG_DIR.getAbsolutePath(), path); + } + + @Override + public Path getLocalPathForWrite(String path) throws IOException { + return new Path(ABS_LOG_DIR.getAbsolutePath()); + } @Override + public Path getLocalPathForWrite(String path, long size) + throws IOException { + return new Path(ABS_LOG_DIR.getAbsolutePath()); + } + } + + private static class MockShuffleHandler2 extends + org.apache.hadoop.mapred.ShuffleHandler { + boolean socketKeepAlive = false; + @Override protected Shuffle getShuffle(final Configuration conf) { return new Shuffle(conf) { @Override @@ -479,6 +513,11 @@ public class TestShuffleHandler { conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100); HttpURLConnection conn = null; MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2(); + AuxiliaryLocalPathHandler pathHandler = + mock(AuxiliaryLocalPathHandler.class); + when(pathHandler.getLocalPathForRead(anyString())).thenThrow( + new DiskChecker.DiskErrorException("Test")); + shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler); try { shuffleHandler.init(conf); shuffleHandler.start(); @@ -661,19 +700,16 @@ public class TestShuffleHandler { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); - File absLogDir = new File("target", - TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(12345, 1); LOG.info(appId.toString()); String appAttemptId = "attempt_12345_1_m_1_0"; String user = "randomUser"; String reducerId = "0"; List<File> fileMap = new ArrayList<File>(); - createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + createShuffleHandlerFiles(ABS_LOG_DIR, user, appId.toString(), appAttemptId, conf, fileMap); ShuffleHandler shuffleHandler = new ShuffleHandler() { - @Override protected Shuffle getShuffle(Configuration conf) { // replace the shuffle handler with one stubbed for testing @@ -689,6 +725,8 @@ public class TestShuffleHandler { }; } }; + AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); + shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler); shuffleHandler.init(conf); try { shuffleHandler.start(); @@ -733,7 +771,7 @@ public class TestShuffleHandler { Assert.assertTrue((new String(byteArr)).contains(message)); } finally { shuffleHandler.stop(); - FileUtil.fullyDelete(absLogDir); + FileUtil.fullyDelete(ABS_LOG_DIR); } } @@ -794,10 +832,13 @@ public class TestShuffleHandler { final File tmpDir = new File(System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), TestShuffleHandler.class.getName()); + ShuffleHandler shuffle = new ShuffleHandler(); + AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); + shuffle.setAuxiliaryLocalPathHandler(pathHandler); Configuration conf = new Configuration(); conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); - ShuffleHandler shuffle = new ShuffleHandler(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath()); // emulate aux services startup with recovery enabled shuffle.setRecoveryPath(new Path(tmpDir.toString())); tmpDir.mkdirs(); @@ -823,6 +864,7 @@ public class TestShuffleHandler { // emulate shuffle handler restart shuffle.close(); shuffle = new ShuffleHandler(); + shuffle.setAuxiliaryLocalPathHandler(pathHandler); shuffle.setRecoveryPath(new Path(tmpDir.toString())); shuffle.init(conf); shuffle.start(); @@ -865,6 +907,9 @@ public class TestShuffleHandler { conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); ShuffleHandler shuffle = new ShuffleHandler(); + AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); + shuffle.setAuxiliaryLocalPathHandler(pathHandler); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath()); // emulate aux services startup with recovery enabled shuffle.setRecoveryPath(new Path(tmpDir.toString())); tmpDir.mkdirs(); @@ -890,6 +935,7 @@ public class TestShuffleHandler { // emulate shuffle handler restart shuffle.close(); shuffle = new ShuffleHandler(); + shuffle.setAuxiliaryLocalPathHandler(pathHandler); shuffle.setRecoveryPath(new Path(tmpDir.toString())); shuffle.init(conf); shuffle.start(); @@ -907,6 +953,7 @@ public class TestShuffleHandler { Assert.assertEquals(version11, shuffle.loadVersion()); shuffle.close(); shuffle = new ShuffleHandler(); + shuffle.setAuxiliaryLocalPathHandler(pathHandler); shuffle.setRecoveryPath(new Path(tmpDir.toString())); shuffle.init(conf); shuffle.start(); @@ -923,6 +970,7 @@ public class TestShuffleHandler { Assert.assertEquals(version21, shuffle.loadVersion()); shuffle.close(); shuffle = new ShuffleHandler(); + shuffle.setAuxiliaryLocalPathHandler(pathHandler); shuffle.setRecoveryPath(new Path(tmpDir.toString())); shuffle.init(conf); @@ -972,16 +1020,15 @@ public class TestShuffleHandler { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); - File absLogDir = new File("target", TestShuffleHandler.class. - getSimpleName() + "LocDir").getAbsoluteFile(); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(12345, 1); String appAttemptId = "attempt_12345_1_m_1_0"; String user = "randomUser"; String reducerId = "0"; List<File> fileMap = new ArrayList<File>(); - createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + createShuffleHandlerFiles(ABS_LOG_DIR, user, appId.toString(), appAttemptId, conf, fileMap); + AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); ShuffleHandler shuffleHandler = new ShuffleHandler() { @Override protected Shuffle getShuffle(Configuration conf) { @@ -1025,6 +1072,7 @@ public class TestShuffleHandler { }; } }; + shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler); shuffleHandler.init(conf); try { shuffleHandler.start(); @@ -1063,7 +1111,7 @@ public class TestShuffleHandler { 0, failures.size()); } finally { shuffleHandler.stop(); - FileUtil.fullyDelete(absLogDir); + FileUtil.fullyDelete(ABS_LOG_DIR); } } @@ -1073,10 +1121,10 @@ public class TestShuffleHandler { new ArrayList<ShuffleHandler.ReduceMapFileCount>(); final ChannelHandlerContext mockCtx = - Mockito.mock(ChannelHandlerContext.class); - final MessageEvent mockEvt = Mockito.mock(MessageEvent.class); - final Channel mockCh = Mockito.mock(AbstractChannel.class); - final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class); + mock(ChannelHandlerContext.class); + final MessageEvent mockEvt = mock(MessageEvent.class); + final Channel mockCh = mock(AbstractChannel.class); + final ChannelPipeline mockPipeline = mock(ChannelPipeline.class); // Mock HttpRequest and ChannelFuture final HttpRequest mockHttpRequest = createMockHttpRequest(); @@ -1087,16 +1135,16 @@ public class TestShuffleHandler { // Mock Netty Channel Context and Channel behavior Mockito.doReturn(mockCh).when(mockCtx).getChannel(); - Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline); - Mockito.when(mockPipeline.get( + when(mockCh.getPipeline()).thenReturn(mockPipeline); + when(mockPipeline.get( Mockito.any(String.class))).thenReturn(timerHandler); - Mockito.when(mockCtx.getChannel()).thenReturn(mockCh); + when(mockCtx.getChannel()).thenReturn(mockCh); Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class)); - Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture); + when(mockCh.write(Object.class)).thenReturn(mockFuture); //Mock MessageEvent behavior Mockito.doReturn(mockCh).when(mockEvt).getChannel(); - Mockito.when(mockEvt.getChannel()).thenReturn(mockCh); + when(mockEvt.getChannel()).thenReturn(mockCh); Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage(); final ShuffleHandler sh = new MockShuffleHandler(); @@ -1120,8 +1168,8 @@ public class TestShuffleHandler { public ChannelFuture createMockChannelFuture(Channel mockCh, final List<ShuffleHandler.ReduceMapFileCount> listenerList) { - final ChannelFuture mockFuture = Mockito.mock(ChannelFuture.class); - Mockito.when(mockFuture.getChannel()).thenReturn(mockCh); + final ChannelFuture mockFuture = mock(ChannelFuture.class); + when(mockFuture.getChannel()).thenReturn(mockCh); Mockito.doReturn(true).when(mockFuture).isSuccess(); Mockito.doAnswer(new Answer() { @Override @@ -1139,7 +1187,7 @@ public class TestShuffleHandler { } public HttpRequest createMockHttpRequest() { - HttpRequest mockHttpRequest = Mockito.mock(HttpRequest.class); + HttpRequest mockHttpRequest = mock(HttpRequest.class); Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).getMethod(); Mockito.doAnswer(new Answer() { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3ea2c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryLocalPathHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryLocalPathHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryLocalPathHandler.java new file mode 100644 index 0000000..50feecf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryLocalPathHandler.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +/** An Interface that can retrieve local directories to read from or write to. + * Components can implement this interface to link it to + * their own Directory Handler Service + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface AuxiliaryLocalPathHandler { + /** + * Get a path from the local FS for reading for a given Auxiliary Service. + * @param path the requested path + * @return the complete path to the file on a local disk + * @throws IOException if the file read encounters a problem + */ + Path getLocalPathForRead(String path) throws IOException; + + /** + * Get a path from the local FS for writing for a given Auxiliary Service. + * @param path the requested path + * @return the complete path to the file on a local disk + * @throws IOException if the path creations fails + */ + Path getLocalPathForWrite(String path) throws IOException; + + /** + * Get a path from the local FS for writing a file of an estimated size + * for a given Auxiliary Service. + * @param path the requested path + * @param size the size of the file that is going to be written + * @return the complete path to the file on a local disk + * @throws IOException if the path creations fails + */ + Path getLocalPathForWrite(String path, long size) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3ea2c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java index 58b1d4a..79f2ede 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; public abstract class AuxiliaryService extends AbstractService { private Path recoveryPath = null; + private AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; protected AuxiliaryService(String name) { super(name); @@ -123,4 +124,24 @@ public abstract class AuxiliaryService extends AbstractService { public void setRecoveryPath(Path recoveryPath) { this.recoveryPath = recoveryPath; } + + /** + * Method that gets the local dirs path handler for this Auxiliary Service. + * + * @return auxiliaryPathHandler object that is used to read from and write to + * valid local Dirs. + */ + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return this.auxiliaryLocalPathHandler; + } + + /** + * Method that sets the local dirs path handler for this Auxiliary Service. + * + * @param auxiliaryLocalPathHandler the pathHandler for this auxiliary service + */ + public void setAuxiliaryLocalPathHandler( + AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) { + this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3ea2c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index 5cc4e19..976df7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -551,6 +551,10 @@ public class LocalDirsHandlerService extends AbstractService { checkWrite); } + public Path getLocalPathForRead(String pathStr) throws IOException { + return getPathToRead(pathStr, getLocalDirsForRead()); + } + public Path getLogPathForWrite(String pathStr, boolean checkWrite) throws IOException { return logDirsAllocator.getLocalPathForWrite(pathStr, http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3ea2c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index cd5ed88..5b63936 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; @@ -55,15 +56,17 @@ public class AuxServices extends AbstractService protected final Map<String,AuxiliaryService> serviceMap; protected final Map<String,ByteBuffer> serviceMetaData; + private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$"); - public AuxServices() { + public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) { super(AuxServices.class.getName()); serviceMap = Collections.synchronizedMap(new HashMap<String,AuxiliaryService>()); serviceMetaData = Collections.synchronizedMap(new HashMap<String,ByteBuffer>()); + this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; // Obtain services from configuration in init() } @@ -134,6 +137,7 @@ public class AuxServices extends AbstractService +"Service Meta Data may have issues unless the refer to " +"the name in the config."); } + s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler); addService(sName, s); if (recoveryEnabled) { Path storePath = new Path(stateStoreRoot, sName); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3ea2c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 942f7ac..a46ea12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -42,6 +42,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; @@ -97,6 +98,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; @@ -213,8 +215,10 @@ public class ContainerManagerImpl extends CompositeService implements this.nodeStatusUpdater = nodeStatusUpdater; + AuxiliaryLocalPathHandler auxiliaryLocalPathHandler = + new AuxiliaryLocalPathHandlerImpl(dirsHandler); // Start configurable services - auxiliaryServices = new AuxServices(); + auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler); auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); @@ -1326,6 +1330,35 @@ public class ContainerManagerImpl extends CompositeService implements } } + /** + * Implements AuxiliaryLocalPathHandler. + * It links NodeManager's LocalDirsHandlerService to the Auxiliary Services + */ + static class AuxiliaryLocalPathHandlerImpl + implements AuxiliaryLocalPathHandler { + private LocalDirsHandlerService dirhandlerService; + AuxiliaryLocalPathHandlerImpl( + LocalDirsHandlerService dirhandlerService) { + this.dirhandlerService = dirhandlerService; + } + + @Override + public Path getLocalPathForRead(String path) throws IOException { + return dirhandlerService.getLocalPathForRead(path); + } + + @Override + public Path getLocalPathForWrite(String path) throws IOException { + return dirhandlerService.getLocalPathForWrite(path); + } + + @Override + public Path getLocalPathForWrite(String path, long size) + throws IOException { + return dirhandlerService.getLocalPathForWrite(path, size, false); + } + } + @SuppressWarnings("unchecked") @Override public void handle(ContainerManagerEvent event) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3ea2c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index 91466e8..2260358 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; @@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; public class TestAuxServices { private static final Log LOG = LogFactory.getLog(TestAuxServices.class); @@ -66,6 +68,8 @@ public class TestAuxServices { System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), TestAuxServices.class.getName()); + private final static AuxiliaryLocalPathHandler MOCK_AUX_PATH_HANDLER = + Mockito.mock(AuxiliaryLocalPathHandler.class); static class LightService extends AuxiliaryService implements Service { @@ -160,7 +164,7 @@ public class TestAuxServices { ServiceB.class, Service.class); conf.setInt("A.expected.init", 1); conf.setInt("B.expected.stop", 1); - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); aux.init(conf); aux.start(); @@ -224,7 +228,7 @@ public class TestAuxServices { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); aux.init(conf); int latch = 1; @@ -236,8 +240,9 @@ public class TestAuxServices { } assertEquals("Invalid mix of services", 6, latch); aux.start(); - for (Service s : aux.getServices()) { + for (AuxiliaryService s : aux.getServices()) { assertEquals(STARTED, s.getServiceState()); + assertEquals(s.getAuxiliaryLocalPathHandler(), MOCK_AUX_PATH_HANDLER); } aux.stop(); @@ -255,7 +260,7 @@ public class TestAuxServices { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); aux.init(conf); int latch = 1; @@ -292,7 +297,7 @@ public class TestAuxServices { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); aux.init(conf); aux.start(); @@ -305,7 +310,7 @@ public class TestAuxServices { @Test public void testValidAuxServiceName() { - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); Configuration conf = new Configuration(); conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", "Bsrv_2"}); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"), @@ -319,7 +324,7 @@ public class TestAuxServices { } //Test bad auxService Name - final AuxServices aux1 = new AuxServices(); + final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER); conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"1Asrv1"}); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "1Asrv1"), ServiceA.class, Service.class); @@ -345,7 +350,7 @@ public class TestAuxServices { conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), RecoverableServiceB.class, Service.class); try { - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); aux.init(conf); Assert.assertEquals(2, aux.getServices().size()); File auxStorageDir = new File(TEST_DIR, http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3ea2c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index bb32799..f6c9f67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; @@ -98,10 +99,15 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestContainerManager extends BaseContainerManagerTest { @@ -265,6 +271,41 @@ public class TestContainerManager extends BaseContainerManagerTest { Assert.assertEquals(null, reader.readLine()); } + @Test (timeout = 10000L) + public void testAuxPathHandler() throws Exception { + File testDir = GenericTestUtils.getTestDir(GenericTestUtils.getTestDir( + TestContainerManager.class.getSimpleName() + "LocDir"). + getAbsolutePath()); + testDir.mkdirs(); + File testFile = new File(testDir, "test"); + testFile.createNewFile(); + YarnConfiguration configuration = new YarnConfiguration(); + configuration.set(YarnConfiguration.NM_LOCAL_DIRS, + testDir.getAbsolutePath()); + LocalDirsHandlerService spyDirHandlerService = + Mockito.spy(new LocalDirsHandlerService()); + spyDirHandlerService.init(configuration); + when(spyDirHandlerService.getConfig()).thenReturn(configuration); + AuxiliaryLocalPathHandler auxiliaryLocalPathHandler = + new ContainerManagerImpl.AuxiliaryLocalPathHandlerImpl( + spyDirHandlerService); + Path p = auxiliaryLocalPathHandler.getLocalPathForRead("test"); + assertTrue(p != null && + !spyDirHandlerService.getLocalDirsForRead().isEmpty()); + + when(spyDirHandlerService.getLocalDirsForRead()).thenReturn( + new ArrayList<String>()); + try { + auxiliaryLocalPathHandler.getLocalPathForRead("test"); + fail("Should not have passed!"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("Could not find")); + } finally { + testFile.delete(); + testDir.delete(); + } + } + //@Test public void testContainerLaunchAndStop() throws IOException, InterruptedException, YarnException { @@ -908,8 +949,8 @@ public class TestContainerManager extends BaseContainerManagerTest { ContainerManagerImpl spyContainerMgr = Mockito.spy(cMgrImpl); UserGroupInformation ugInfo = UserGroupInformation.createRemoteUser("a"); - Mockito.when(spyContainerMgr.getRemoteUgi()).thenReturn(ugInfo); - Mockito.when(spyContainerMgr. + when(spyContainerMgr.getRemoteUgi()).thenReturn(ugInfo); + when(spyContainerMgr. selectNMTokenIdentifier(ugInfo)).thenReturn(null); strExceptionMsg = ""; @@ -1353,7 +1394,7 @@ public class TestContainerManager extends BaseContainerManagerTest { recordFactory.newRecordInstance(ContainerLaunchContext.class); ContainerLaunchContext spyContainerLaunchContext = Mockito.spy(containerLaunchContext); - Mockito.when(spyContainerLaunchContext.getLocalResources()) + when(spyContainerLaunchContext.getLocalResources()) .thenReturn(localResources); ContainerId cId = createContainerId(0); @@ -1398,7 +1439,7 @@ public class TestContainerManager extends BaseContainerManagerTest { recordFactory.newRecordInstance(ContainerLaunchContext.class); ContainerLaunchContext spyContainerLaunchContext = Mockito.spy(containerLaunchContext); - Mockito.when(spyContainerLaunchContext.getLocalResources()) + when(spyContainerLaunchContext.getLocalResources()) .thenReturn(localResources); ContainerId cId = createContainerId(0); @@ -1443,7 +1484,7 @@ public class TestContainerManager extends BaseContainerManagerTest { recordFactory.newRecordInstance(ContainerLaunchContext.class); ContainerLaunchContext spyContainerLaunchContext = Mockito.spy(containerLaunchContext); - Mockito.when(spyContainerLaunchContext.getLocalResources()) + when(spyContainerLaunchContext.getLocalResources()) .thenReturn(localResources); ContainerId cId = createContainerId(0); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org