This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch revert-5158-batchCounter
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 36ed410856925a7f6fa27dac210726d59378eed5
Author: chenglei <cheng...@apache.org>
AuthorDate: Thu Apr 6 21:17:36 2023 +0800

    Revert "HBASE-27778 Incorrect ReplicationSourceWALReader.totalBufferUsed 
may … (#5158)"
    
    This reverts commit a370099aaaeec662209ab967e2d1879622e357c4.
---
 .../regionserver/ReplicationSourceWALReader.java   | 46 ++++--------
 .../replication/regionserver/WALEntryBatch.java    | 13 +---
 .../regionserver/TestBasicWALEntryStream.java      | 84 ----------------------
 3 files changed, 13 insertions(+), 130 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index d52ed86b2ff..4e1d76a9764 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -140,9 +140,11 @@ class ReplicationSourceWALReader extends Thread {
   public void run() {
     int sleepMultiplier = 1;
     while (isReaderRunning()) { // we only loop back here if something fatal 
happened to our stream
+      WALEntryBatch batch = null;
       try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, 
currentPosition,
         source.getWALFileLengthProvider(), source.getSourceMetrics(), 
walGroupId)) {
         while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
+          batch = null;
           if (!source.isPeerEnabled()) {
             Threads.sleep(sleepForRetries);
             continue;
@@ -172,25 +174,14 @@ class ReplicationSourceWALReader extends Thread {
             continue;
           }
           // below are all for hasNext == YES
-          WALEntryBatch batch = createBatch(entryStream);
-          boolean successAddToQueue = false;
-          try {
-            readWALEntries(entryStream, batch);
-            currentPosition = entryStream.getPosition();
-            // need to propagate the batch even it has no entries since it may 
carry the last
-            // sequence id information for serial replication.
-            LOG.debug("Read {} WAL entries eligible for replication", 
batch.getNbEntries());
-            entryBatchQueue.put(batch);
-            successAddToQueue = true;
-            sleepMultiplier = 1;
-          } finally {
-            if (!successAddToQueue) {
-              // batch is not put to 
ReplicationSourceWALReader#entryBatchQueue,so we should
-              // decrease ReplicationSourceWALReader.totalBufferUsed by the 
byte size which
-              // acquired in ReplicationSourceWALReader.acquireBufferQuota.
-              this.releaseBufferQuota(batch);
-            }
-          }
+          batch = createBatch(entryStream);
+          readWALEntries(entryStream, batch);
+          currentPosition = entryStream.getPosition();
+          // need to propagate the batch even it has no entries since it may 
carry the last
+          // sequence id information for serial replication.
+          LOG.debug("Read {} WAL entries eligible for replication", 
batch.getNbEntries());
+          entryBatchQueue.put(batch);
+          sleepMultiplier = 1;
         }
       } catch (WALEntryFilterRetryableException e) {
         // here we have to recreate the WALEntryStream, as when filtering, we 
have already called
@@ -221,7 +212,7 @@ class ReplicationSourceWALReader extends Thread {
     long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
     batch.addEntry(entry, entrySize);
     updateBatchStats(batch, entry, entrySize);
-    boolean totalBufferTooLarge = acquireBufferQuota(batch, 
entrySizeExcludeBulkLoad);
+    boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
 
     // Stop if too many entries or too big
     return totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
@@ -439,26 +430,13 @@ class ReplicationSourceWALReader extends Thread {
    * @param size delta size for grown buffer
    * @return true if we should clear buffer and push all
    */
-  private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) {
+  private boolean acquireBufferQuota(long size) {
     long newBufferUsed = totalBufferUsed.addAndGet(size);
     // Record the new buffer usage
     
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
-    walEntryBatch.incrementUsedBufferSize(size);
     return newBufferUsed >= totalBufferQuota;
   }
 
-  /**
-   * To release the buffer quota of {@link WALEntryBatch} which acquired by
-   * {@link ReplicationSourceWALReader#acquireBufferQuota}
-   */
-  private void releaseBufferQuota(WALEntryBatch walEntryBatch) {
-    long usedBufferSize = walEntryBatch.getUsedBufferSize();
-    if (usedBufferSize > 0) {
-      long newBufferUsed = totalBufferUsed.addAndGet(-usedBufferSize);
-      
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
-    }
-  }
-
   /** Returns whether the reader thread is running */
   public boolean isReaderRunning() {
     return isReaderRunning && !isInterrupted();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 32a149db9cd..b5ef0f92bcc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -52,9 +52,6 @@ class WALEntryBatch {
   private Map<String, Long> lastSeqIds = new HashMap<>();
   // indicate that this is the end of the current file
   private boolean endOfFile;
-  // indicate the buffer size used, which is added to
-  // ReplicationSourceWALReader.totalBufferUsed
-  private long usedBufferSize;
 
   /**
    * @param lastWalPath Path of the WAL the last entry in this batch was read 
from
@@ -156,19 +153,11 @@ class WALEntryBatch {
     lastSeqIds.put(region, sequenceId);
   }
 
-  public void incrementUsedBufferSize(long increment) {
-    usedBufferSize += increment;
-  }
-
-  public long getUsedBufferSize() {
-    return this.usedBufferSize;
-  }
-
   @Override
   public String toString() {
     return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", 
lastWalPath=" + lastWalPath
       + ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + 
", nbHFiles="
       + nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + 
", endOfFile="
-      + endOfFile + ",usedBufferSize=" + usedBufferSize + "]";
+      + endOfFile + "]";
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
index 01f0659de58..efd76854250 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
@@ -308,14 +308,6 @@ public abstract class TestBasicWALEntryStream extends 
WALEntryStreamTestBase {
     when(source.isRecovered()).thenReturn(recovered);
     MetricsReplicationGlobalSourceSource globalMetrics =
       Mockito.mock(MetricsReplicationGlobalSourceSource.class);
-    final AtomicLong bufferUsedCounter = new AtomicLong(0);
-    Mockito.doAnswer((invocationOnMock) -> {
-      bufferUsedCounter.set(invocationOnMock.getArgument(0, Long.class));
-      return null;
-    }).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong());
-    when(globalMetrics.getWALReaderEditsBufferBytes())
-      .then(invocationOnMock -> bufferUsedCounter.get());
-
     when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
     return source;
   }
@@ -799,80 +791,4 @@ public abstract class TestBasicWALEntryStream extends 
WALEntryStreamTestBase {
     Waiter.waitFor(localConf, 10000,
       (Waiter.Predicate<Exception>) () -> 
logQueue.getQueueSize(fakeWalGroupId) == 1);
   }
-
-  /**
-   * This test is for HBASE-27778, when {@link WALEntryFilter#filter} throws 
exception for some
-   * entries in {@link WALEntryBatch},{@link 
ReplicationSourceWALReader#totalBufferUsed} should be
-   * decreased because {@link WALEntryBatch} is not put to
-   * {@link ReplicationSourceWALReader#entryBatchQueue}.
-   */
-  @Test
-  public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() 
throws Exception {
-    appendEntriesToLogAndSync(3);
-    // get ending position
-    long position;
-    try (WALEntryStream entryStream =
-      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), 
fakeWalGroupId)) {
-      for (int i = 0; i < 3; i++) {
-        assertNotNull(next(entryStream));
-      }
-      position = entryStream.getPosition();
-    }
-
-    Path walPath = getQueue().peek();
-    int maxThrowExceptionCount = 3;
-
-    ReplicationSource source = mockReplicationSource(false, CONF);
-    when(source.isPeerEnabled()).thenReturn(true);
-    PartialWALEntryFailingWALEntryFilter walEntryFilter =
-      new PartialWALEntryFailingWALEntryFilter(maxThrowExceptionCount, 3);
-    ReplicationSourceWALReader reader =
-      new ReplicationSourceWALReader(fs, CONF, logQueue, 0, walEntryFilter, 
source, fakeWalGroupId);
-    reader.start();
-    WALEntryBatch entryBatch = reader.take();
-
-    assertNotNull(entryBatch);
-    assertEquals(3, entryBatch.getWalEntries().size());
-    long sum = entryBatch.getWalEntries().stream()
-      
.mapToLong(ReplicationSourceWALReader::getEntrySizeExcludeBulkLoad).sum();
-    assertEquals(position, entryBatch.getLastWalPosition());
-    assertEquals(walPath, entryBatch.getLastWalPath());
-    assertEquals(3, entryBatch.getNbRowKeys());
-    assertEquals(sum, source.getSourceManager().getTotalBufferUsed().get());
-    assertEquals(sum, 
source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes());
-    assertEquals(maxThrowExceptionCount, 
walEntryFilter.getThrowExceptionCount());
-    assertNull(reader.poll(10));
-  }
-
-  private static class PartialWALEntryFailingWALEntryFilter implements 
WALEntryFilter {
-    private int filteredWALEntryCount = -1;
-    private int walEntryCount = 0;
-    private int throwExceptionCount = -1;
-    private int maxThrowExceptionCount;
-
-    public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int 
walEntryCount) {
-      this.maxThrowExceptionCount = throwExceptionLimit;
-      this.walEntryCount = walEntryCount;
-    }
-
-    @Override
-    public Entry filter(Entry entry) {
-      filteredWALEntryCount++;
-      if (filteredWALEntryCount < walEntryCount - 1) {
-        return entry;
-      }
-
-      filteredWALEntryCount = -1;
-      throwExceptionCount++;
-      if (throwExceptionCount <= maxThrowExceptionCount - 1) {
-        throw new WALEntryFilterRetryableException("failing filter");
-      }
-      return entry;
-    }
-
-    public int getThrowExceptionCount() {
-      return throwExceptionCount;
-    }
-  }
-
 }

Reply via email to