YARN-3082. Non thread safe access to systemCredentials in NodeHeartbeatResponse processing. Contributed by Anubhav Dhoot.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a06d2d65 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a06d2d65 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a06d2d65 Branch: refs/heads/HDFS-EC Commit: a06d2d65d38ae464d02809b1c6718d20b960ac8b Parents: 8a96c6d Author: Tsuyoshi Ozawa <oz...@apache.org> Authored: Fri Jan 23 16:04:18 2015 +0900 Committer: Zhe Zhang <z...@apache.org> Committed: Mon Jan 26 09:43:29 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../impl/pb/NodeHeartbeatResponsePBImpl.java | 3 +- .../nodemanager/TestNodeStatusUpdater.java | 63 ++++++++++++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06d2d65/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index bdc31db..7f0628d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -400,6 +400,9 @@ Release 2.7.0 - UNRELEASED YARN-3078. LogCLIHelpers lacks of a blank space before string 'does not exist'. (Sam Liu via ozawa) + YARN-3082. Non thread safe access to systemCredentials in NodeHeartbeatResponse + processing. (Anubhav Dhoot via ozawa) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06d2d65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 1e91514..630a5bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -104,7 +104,8 @@ public class NodeHeartbeatResponsePBImpl extends for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) { builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) - .setCredentialsForApp(ProtoUtils.convertToProtoFormat(entry.getValue()))); + .setCredentialsForApp(ProtoUtils.convertToProtoFormat( + entry.getValue().duplicate()))); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06d2d65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 46d7b10..71a420e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -37,7 +38,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -74,12 +78,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -1463,6 +1469,63 @@ public class TestNodeStatusUpdater { nm.stop(); } + @Test + public void testConcurrentAccessToSystemCredentials(){ + final Map<ApplicationId, ByteBuffer> testCredentials = new HashMap<>(); + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[300]); + ApplicationId applicationId = ApplicationId.newInstance(123456, 120); + testCredentials.put(applicationId, byteBuffer); + + final List<Throwable> exceptions = Collections.synchronizedList(new + ArrayList<Throwable>()); + + final int NUM_THREADS = 10; + final CountDownLatch allDone = new CountDownLatch(NUM_THREADS); + final ExecutorService threadPool = Executors.newFixedThreadPool( + NUM_THREADS); + + final AtomicBoolean stop = new AtomicBoolean(false); + + try { + for (int i = 0; i < NUM_THREADS; i++) { + threadPool.submit(new Runnable() { + @Override + public void run() { + try { + for (int i = 0; i < 100 && !stop.get(); i++) { + NodeHeartbeatResponse nodeHeartBeatResponse = + newNodeHeartbeatResponse(0, NodeAction.NORMAL, + null, null, null, null, 0); + nodeHeartBeatResponse.setSystemCredentialsForApps( + testCredentials); + NodeHeartbeatResponseProto proto = + ((NodeHeartbeatResponsePBImpl)nodeHeartBeatResponse) + .getProto(); + Assert.assertNotNull(proto); + } + } catch (Throwable t) { + exceptions.add(t); + stop.set(true); + } finally { + allDone.countDown(); + } + } + }); + } + + int testTimeout = 2; + Assert.assertTrue("Timeout waiting for more than " + testTimeout + " " + + "seconds", + allDone.await(testTimeout, TimeUnit.SECONDS)); + } catch (InterruptedException ie) { + exceptions.add(ie); + } finally { + threadPool.shutdownNow(); + } + Assert.assertTrue("Test failed with exception(s)" + exceptions, + exceptions.isEmpty()); + } + // Add new containers info into NM context each time node heart beats. private class MyNMContext extends NMContext {