HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing (iwasakims)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9eec6cbe Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9eec6cbe Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9eec6cbe Branch: refs/heads/YARN-1011 Commit: 9eec6cbedcc53e6dd306a4a578a22937ae239260 Parents: 5ff5f67 Author: Masatake Iwasaki <iwasak...@apache.org> Authored: Wed Jan 20 05:00:02 2016 +0900 Committer: Masatake Iwasaki <iwasak...@apache.org> Committed: Wed Jan 20 05:00:02 2016 +0900 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++ .../test/java/org/apache/hadoop/ipc/TestIPC.java | 18 ++++++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9eec6cbe/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index a5dcf6a..85f304a 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -676,6 +676,9 @@ Release 2.9.0 - UNRELEASED BUG FIXES + HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing + (iwasakims) + HADOOP-12655. TestHttpServer.testBindAddress bind port range is wider than expected. (Wei-Chiu Chuang via stevel) http://git-wip-us.apache.org/repos/asf/hadoop/blob/9eec6cbe/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index d6c0658..78dcdcd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -27,6 +27,8 @@ import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import java.io.ByteArrayOutputStream; import java.io.DataInput; @@ -71,6 +73,7 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcKind; +import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.Server.Connection; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.net.ConnectTimeoutException; @@ -84,6 +87,7 @@ import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -703,6 +707,7 @@ public class TestIPC { // goal is to jam a handler with a connection, fill the callq with // connections, in turn jamming the readers - then flood the server and // ensure that the listener blocks when the reader connection queues fill + @SuppressWarnings("unchecked") private void checkBlocking(int readers, int readerQ, int callQ) throws Exception { int handlers = 1; // makes it easier @@ -722,6 +727,9 @@ public class TestIPC { // start server final TestServerQueue server = new TestServerQueue(clients, readers, callQ, handlers, conf); + CallQueueManager<Call> spy = spy( + (CallQueueManager<Call>)Whitebox.getInternalState(server, "callQueue")); + Whitebox.setInternalState(server, "callQueue", spy); final InetSocketAddress addr = NetUtils.getConnectAddress(server); server.start(); @@ -757,12 +765,10 @@ public class TestIPC { if (i==0) { // let first reader block in a call server.firstCallLatch.await(); - } else if (i <= callQ) { - // let subsequent readers jam the callq, will happen immediately - while (server.getCallQueueLen() != i) { - Thread.sleep(1); - } - } // additional threads block the readers trying to add to the callq + } + // wait until reader put a call to callQueue, to make sure all readers + // are blocking on the queue after initialClients threads are started. + verify(spy, timeout(100).times(i + 1)).put(Mockito.<Call>anyObject()); } try {