This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 76887c1b4978 Revert "HDFS-16016. BPServiceActor to provide new thread 
to handle IBR (#2998)" (#6457) Contributed by Shilun Fan.
76887c1b4978 is described below

commit 76887c1b4978d7bb092ad2c1897f6f2c4d369a69
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Sat Jan 20 07:51:55 2024 +0800

    Revert "HDFS-16016. BPServiceActor to provide new thread to handle IBR 
(#2998)" (#6457) Contributed by Shilun Fan.
    
    This reverts commit c1bf3cb0.
    
    Reviewed-by: Takanobu Asanuma <tasan...@apache.org>
    Reviewed-by: He Xiaoqiao <hexiaoq...@apache.org>
    Reviewed-by: Ayush Saxena <ayushsax...@apache.org>
    Reviewed-by: Viraj Jasani <vjas...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../hdfs/server/datanode/BPServiceActor.java       | 62 +++-------------------
 .../org/apache/hadoop/hdfs/TestDatanodeReport.java | 17 ++----
 .../datanode/TestIncrementalBlockReports.java      | 24 +++------
 3 files changed, 17 insertions(+), 86 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index b552fa277d04..4bac0d8fb47f 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -36,8 +36,6 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -73,7 +71,6 @@ import 
org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
-import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
@@ -103,8 +100,6 @@ class BPServiceActor implements Runnable {
   
   volatile long lastCacheReport = 0;
   private final Scheduler scheduler;
-  private final Object sendIBRLock;
-  private final ExecutorService ibrExecutorService;
 
   Thread bpThread;
   DatanodeProtocolClientSideTranslatorPB bpNamenode;
@@ -161,10 +156,6 @@ class BPServiceActor implements Runnable {
     }
     commandProcessingThread = new CommandProcessingThread(this);
     commandProcessingThread.start();
-    sendIBRLock = new Object();
-    ibrExecutorService = Executors.newSingleThreadExecutor(
-        new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("ibr-executor-%d").build());
   }
 
   public DatanodeRegistration getBpRegistration() {
@@ -397,10 +388,8 @@ class BPServiceActor implements Runnable {
     // we have a chance that we will miss the delHint information
     // or we will report an RBW replica after the BlockReport already reports
     // a FINALIZED one.
-    synchronized (sendIBRLock) {
-      ibrManager.sendIBRs(bpNamenode, bpRegistration,
-          bpos.getBlockPoolId(), getRpcMetricSuffix());
-    }
+    ibrManager.sendIBRs(bpNamenode, bpRegistration,
+        bpos.getBlockPoolId(), getRpcMetricSuffix());
 
     long brCreateStartTime = monotonicNow();
     Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
@@ -633,9 +622,6 @@ class BPServiceActor implements Runnable {
     if (commandProcessingThread != null) {
       commandProcessingThread.interrupt();
     }
-    if (ibrExecutorService != null && !ibrExecutorService.isShutdown()) {
-      ibrExecutorService.shutdownNow();
-    }
   }
   
   //This must be called only by blockPoolManager
@@ -650,18 +636,13 @@ class BPServiceActor implements Runnable {
     } catch (InterruptedException ie) { }
   }
   
-  // Cleanup method to be called by current thread before exiting.
-  // Any Thread / ExecutorService started by BPServiceActor can be shutdown
-  // here.
+  //Cleanup method to be called by current thread before exiting.
   private synchronized void cleanUp() {
     
     shouldServiceRun = false;
     IOUtils.cleanupWithLogger(null, bpNamenode);
     IOUtils.cleanupWithLogger(null, lifelineSender);
     bpos.shutdownActor(this);
-    if (!ibrExecutorService.isShutdown()) {
-      ibrExecutorService.shutdownNow();
-    }
   }
 
   private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws 
IOException {
@@ -757,6 +738,11 @@ class BPServiceActor implements Runnable {
             isSlownode = resp.getIsSlownode();
           }
         }
+        if (!dn.areIBRDisabledForTests() &&
+            (ibrManager.sendImmediately()|| sendHeartbeat)) {
+          ibrManager.sendIBRs(bpNamenode, bpRegistration,
+              bpos.getBlockPoolId(), getRpcMetricSuffix());
+        }
 
         List<DatanodeCommand> cmds = null;
         boolean forceFullBr =
@@ -923,10 +909,6 @@ class BPServiceActor implements Runnable {
         initialRegistrationComplete.countDown();
       }
 
-      // IBR tasks to be handled separately from offerService() in order to
-      // improve performance of offerService(), which can now focus only on
-      // FBR and heartbeat.
-      ibrExecutorService.submit(new IBRTaskHandler());
       while (shouldRun()) {
         try {
           offerService();
@@ -1159,34 +1141,6 @@ class BPServiceActor implements Runnable {
     }
   }
 
-  class IBRTaskHandler implements Runnable {
-
-    @Override
-    public void run() {
-      LOG.info("Starting IBR Task Handler.");
-      while (shouldRun()) {
-        try {
-          final long startTime = scheduler.monotonicNow();
-          final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
-          if (!dn.areIBRDisabledForTests() &&
-              (ibrManager.sendImmediately() || sendHeartbeat)) {
-            synchronized (sendIBRLock) {
-              ibrManager.sendIBRs(bpNamenode, bpRegistration,
-                  bpos.getBlockPoolId(), getRpcMetricSuffix());
-            }
-          }
-          // There is no work to do; sleep until heartbeat timer elapses,
-          // or work arrives, and then iterate again.
-          ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
-        } catch (Throwable t) {
-          LOG.error("Exception in IBRTaskHandler.", t);
-          sleepAndLogInterrupts(5000, "offering IBR service");
-        }
-      }
-    }
-
-  }
-
   /**
    * Utility class that wraps the timestamp computations for scheduling
    * heartbeats and block reports.
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
index 239555a8b006..a844e1727b0a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
@@ -172,19 +172,8 @@ public class TestDatanodeReport {
         // all bad datanodes
       }
       cluster.triggerHeartbeats(); // IBR delete ack
-      int retries = 0;
-      while (true) {
-        lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
-        if (0 != lb.getLocations().length) {
-          retries++;
-          if (retries > 7) {
-            Assert.fail("getLocatedBlocks failed after 7 retries");
-          }
-          Thread.sleep(2000);
-        } else {
-          break;
-        }
-      }
+      lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
+      assertEquals(0, lb.getLocations().length);
     } finally {
       cluster.shutdown();
     }
@@ -234,4 +223,4 @@ public class TestDatanodeReport {
     throw new IllegalStateException("Datnode " + id + " not in datanode list: "
         + datanodes);
   }
-}
+}
\ No newline at end of file
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
index e848cbfb37ff..4221ecaf2a06 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.times;
 
 import java.io.IOException;
 
-import org.mockito.exceptions.base.MockitoAssertionError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -157,7 +156,7 @@ public class TestIncrementalBlockReports {
 
       // Sleep for a very short time since IBR is generated
       // asynchronously.
-      Thread.sleep(1000);
+      Thread.sleep(2000);
 
       // Ensure that no block report was generated immediately.
       // Deleted blocks are reported when the IBR timer elapses.
@@ -168,24 +167,13 @@ public class TestIncrementalBlockReports {
 
       // Trigger a heartbeat, this also triggers an IBR.
       DataNodeTestUtils.triggerHeartbeat(singletonDn);
+      Thread.sleep(2000);
 
       // Ensure that the deleted block is reported.
-      int retries = 0;
-      while (true) {
-        try {
-          Mockito.verify(nnSpy, atLeastOnce()).blockReceivedAndDeleted(
-              any(DatanodeRegistration.class),
-              anyString(),
-              any(StorageReceivedDeletedBlocks[].class));
-          break;
-        } catch (MockitoAssertionError e) {
-          if (retries > 7) {
-            throw e;
-          }
-          retries++;
-          Thread.sleep(2000);
-        }
-      }
+      Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted(
+          any(DatanodeRegistration.class),
+          anyString(),
+          any(StorageReceivedDeletedBlocks[].class));
 
     } finally {
       cluster.shutdown();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to