ZanderXu commented on code in PR #4560: URL: https://github.com/apache/hadoop/pull/4560#discussion_r961258064
########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java: ########## @@ -1122,41 +1115,50 @@ public void testSelectViaRpcTwoJNsError() throws Exception { * 4. Observer NameNode try to select EditLogInputStream vis PRC with start txId 21. * 5. Journal 1 has some abnormal cases caused slow response. * - * And the expected selecting result is: Response should contain 20 Edits from txId 21 to txId 40. - * Because there is no Edits from id 21 to 40 in the cache of JournalNode0. + * The expected result should contain txn 21 - 40. */ @Test - public void testSelectViaRpcAfterJNJitter() throws Exception { + public void testSelectViaRPCAfterJNJitter() throws Exception { EditLogOutputStream stm = qjm.startLogSegment( 1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); SettableFuture<Void> slowLog = SettableFuture.create(); Mockito.doReturn(slowLog).when(spies.get(0)) - .sendEdits(eq(1L), eq(11L), eq(1), Mockito.any()); + .sendEdits(eq(1L), eq(11L), eq(10), Mockito.any()); + // Successfully write these edits to JN0 ~ JN2 writeTxns(stm, 1, 10); + // Failed write these edits to JN0, but successfully write them to JN1 ~ JN2 writeTxns(stm, 11, 10); - writeTxns(stm, 21, 10); - writeTxns(stm, 31, 10); - ListeningExecutorService service = MoreExecutors.listeningDecorator( - Executors.newSingleThreadExecutor()); - Mockito.doAnswer(invocation -> service.submit( - () -> { - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - EditLogFileOutputStream.writeHeader( - NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION, - new DataOutputStream(byteStream)); - byteStream.write(createTxnData(21, 20)); - Thread.sleep(3000); - return GetJournaledEditsResponseProto.newBuilder() - .setTxnCount(20) - .setEditLog(ByteString.copyFrom(byteStream.toByteArray())) - .build(); - }) - ).when(spies.get(1)).getJournaledEdits(21, + // Successfully write these edits to JN1 ~ JN2 + writeTxns(stm, 21, 20); + + Semaphore semaphore = new Semaphore(0); + + Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> { + semaphore.release(1); + @SuppressWarnings("unchecked") + ListenableFuture<GetJournaledEditsResponseProto> result = + (ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod(); + return result; + }).when(spies.get(0)).getJournaledEdits(21, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + + Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> { + semaphore.release(1); + @SuppressWarnings("unchecked") + ListenableFuture<GetJournaledEditsResponseProto> result = + (ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod(); + return result; + }).when(spies.get(1)).getJournaledEdits(21, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); - GetJournaledEditsResponseProto responseProto = spies.get(2) - .getJournaledEdits(21, 5000).get(); - assertEquals(20, responseProto.getTxnCount()); + Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> { + semaphore.acquire(2); + @SuppressWarnings("unchecked") + ListenableFuture<GetJournaledEditsResponseProto> result = + (ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod(); + return result; + }).when(spies.get(2)).getJournaledEdits(21, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); Review Comment: Nice suggestion, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org