[ 
https://issues.apache.org/jira/browse/HDFS-16659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17599240#comment-17599240
 ] 

ASF GitHub Bot commented on HDFS-16659:
---------------------------------------

ZanderXu commented on code in PR #4560:
URL: https://github.com/apache/hadoop/pull/4560#discussion_r961258064


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java:
##########
@@ -1122,41 +1115,50 @@ public void testSelectViaRpcTwoJNsError() throws 
Exception {
    * 4. Observer NameNode try to select EditLogInputStream vis PRC with start 
txId 21.
    * 5. Journal 1 has some abnormal cases caused slow response.
    *
-   * And the expected selecting result is: Response should contain 20 Edits 
from txId 21 to txId 40.
-   * Because there is no Edits from id 21 to 40 in the cache of JournalNode0.
+   * The expected result should contain txn 21 - 40.
    */
   @Test
-  public void testSelectViaRpcAfterJNJitter() throws Exception {
+  public void testSelectViaRPCAfterJNJitter() throws Exception {
     EditLogOutputStream stm = qjm.startLogSegment(
         1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
     SettableFuture<Void> slowLog = SettableFuture.create();
     Mockito.doReturn(slowLog).when(spies.get(0))
-        .sendEdits(eq(1L), eq(11L), eq(1), Mockito.any());
+        .sendEdits(eq(1L), eq(11L), eq(10), Mockito.any());
+    // Successfully write these edits to JN0 ~ JN2
     writeTxns(stm, 1, 10);
+    // Failed write these edits to JN0, but successfully write them to JN1 ~ 
JN2
     writeTxns(stm, 11, 10);
-    writeTxns(stm, 21, 10);
-    writeTxns(stm, 31, 10);
-    ListeningExecutorService service = MoreExecutors.listeningDecorator(
-        Executors.newSingleThreadExecutor());
-    Mockito.doAnswer(invocation -> service.submit(
-        () -> {
-          ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
-          EditLogFileOutputStream.writeHeader(
-              NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION,
-              new DataOutputStream(byteStream));
-          byteStream.write(createTxnData(21, 20));
-          Thread.sleep(3000);
-          return GetJournaledEditsResponseProto.newBuilder()
-              .setTxnCount(20)
-              .setEditLog(ByteString.copyFrom(byteStream.toByteArray()))
-              .build();
-        })
-    ).when(spies.get(1)).getJournaledEdits(21,
+    // Successfully write these edits to JN1 ~ JN2
+    writeTxns(stm, 21, 20);
+
+    Semaphore semaphore = new Semaphore(0);
+
+    
Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) 
invocation -> {
+      semaphore.release(1);
+      @SuppressWarnings("unchecked")
+      ListenableFuture<GetJournaledEditsResponseProto> result =
+          (ListenableFuture<GetJournaledEditsResponseProto>) 
invocation.callRealMethod();
+      return result;
+    }).when(spies.get(0)).getJournaledEdits(21,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    
Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) 
invocation -> {
+      semaphore.release(1);
+      @SuppressWarnings("unchecked")
+      ListenableFuture<GetJournaledEditsResponseProto> result =
+          (ListenableFuture<GetJournaledEditsResponseProto>) 
invocation.callRealMethod();
+      return result;
+    }).when(spies.get(1)).getJournaledEdits(21,
         QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
 
-    GetJournaledEditsResponseProto responseProto = spies.get(2)
-        .getJournaledEdits(21, 5000).get();
-    assertEquals(20, responseProto.getTxnCount());
+    
Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) 
invocation -> {
+      semaphore.acquire(2);
+      @SuppressWarnings("unchecked")
+      ListenableFuture<GetJournaledEditsResponseProto> result =
+          (ListenableFuture<GetJournaledEditsResponseProto>) 
invocation.callRealMethod();
+      return result;
+    }).when(spies.get(2)).getJournaledEdits(21,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);

Review Comment:
   Nice suggestion, thanks.





> JournalNode should throw NewerTxnIdException if SinceTxId is bigger than 
> HighestWrittenTxId
> -------------------------------------------------------------------------------------------
>
>                 Key: HDFS-16659
>                 URL: https://issues.apache.org/jira/browse/HDFS-16659
>             Project: Hadoop HDFS
>          Issue Type: Bug
>            Reporter: ZanderXu
>            Assignee: ZanderXu
>            Priority: Critical
>              Labels: pull-request-available
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> JournalNode should throw `CacheMissException` if `sinceTxId` is bigger than 
> `highestWrittenTxId` during handling `getJournaledEdits` rpc from NNs. 
> Current logic may cause in-progress EditlogTailer cannot replay any Edits 
> from JournalNodes in some corner cases, resulting in ObserverNameNode cannot 
> handle requests from clients.
> Suppose there are 3 journalNodes, JN0 ~ JN1.
> * JN0 has some abnormal cases when Active Namenode is syncing 10 Edits with 
> first txid 11
> * NameNode just ignore the abnormal JN0 and continue to sync Edits to Journal 
> 1 and 2
> * JN0 backed to health
> * NameNode continue sync 10 Edits with first txid 21.
> * At this point, there are no Edits 11 ~ 30 in the cache of JN0
> * Observer NameNode try to select EditLogInputStream through 
> `getJournaledEdits` with since txId 21
> * Journal 2 has some abnormal cases and caused a slow response
> The expected result is: Response should contain 20 Edits from txId 21 to txId 
> 30 from JN1 and JN2. Because Active NameNode successfully write these Edits 
> to JN1 and JN2 and failed write these edits to JN0.
> But in the current implementation,  the response is [Response(0) from JN0, 
> Response(10) from JN1], because  there are some abnormal cases in  JN2, such 
> as GC, bad network,  cause a slow response. So the `maxAllowedTxns` will be 
> 0, NameNode will not replay any Edits.
> As above, the root case is that JournalNode should throw Miss Cache Exception 
> when `sinceTxid` is more than `highestWrittenTxId`.
> And the bug code as blew:
> {code:java}
> if (sinceTxId > getHighestWrittenTxId()) {
>     // Requested edits that don't exist yet; short-circuit the cache here
>     metrics.rpcEmptyResponses.incr();
>     return 
> GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build(); 
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to