This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch branch-3.9
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.9 by this push:
     new 2bd1bb7e5 ZOOKEEPER-4794: Reduce the ZKDatabase#committedLog memory 
usage.
2bd1bb7e5 is described below

commit 2bd1bb7e5d0828d03ddb2cd0d7444530ea10ad88
Author: Yan Zhao <horizo...@apache.org>
AuthorDate: Sat Feb 10 00:32:53 2024 +0800

    ZOOKEEPER-4794: Reduce the ZKDatabase#committedLog memory usage.
    
    Reduce the committed log memory usage.
    Fix ci.
    Reviewers: eolivelli, hangc0276, anmolnar
    Author: horizonzy
    Closes #2115 from horizonzy/reduce-committed-log-memory-usage
    
    (cherry picked from commit 18c78cd10bc02d764a46ac1659b263cf69f2671d)
    Signed-off-by: Andor Molnar <an...@apache.org>
---
 .../java/org/apache/zookeeper/server/Request.java  | 19 ++------
 .../zookeeper/server/TxnLogProposalIterator.java   |  7 ++-
 .../org/apache/zookeeper/server/ZKDatabase.java    | 13 ++----
 .../org/apache/zookeeper/server/quorum/Leader.java | 54 ++++++++++++++++++++--
 .../zookeeper/server/quorum/LearnerHandler.java    |  4 +-
 .../server/quorum/flexible/QuorumOracleMaj.java    | 10 ++--
 .../server/quorum/LeaderWithObserverTest.java      |  5 +-
 .../server/quorum/LearnerHandlerTest.java          | 14 +++---
 .../zookeeper/test/GetProposalFromTxnTest.java     |  2 +-
 .../zookeeper/test/LocalSessionRequestTest.java    |  4 +-
 10 files changed, 82 insertions(+), 50 deletions(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 221f12d26..7f38405f2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -19,7 +19,6 @@
 package org.apache.zookeeper.server;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -169,24 +168,16 @@ public class Request {
                 && this.type != OpCode.createSession;
     }
 
-    private transient byte[] serializeData;
-
-    @SuppressFBWarnings(value = "EI_EXPOSE_REP")
     public byte[] getSerializeData() {
         if (this.hdr == null) {
             return null;
         }
-
-        if (this.serializeData == null) {
-            try {
-                this.serializeData = Util.marshallTxnEntry(this.hdr, this.txn, 
this.txnDigest);
-            } catch (IOException e) {
-                LOG.error("This really should be impossible.", e);
-                this.serializeData = new byte[32];
-            }
+        try {
+            return Util.marshallTxnEntry(this.hdr, this.txn, this.txnDigest);
+        } catch (IOException e) {
+            LOG.error("This really should be impossible.", e);
+            return new byte[32];
         }
-
-        return this.serializeData;
     }
 
     /**
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
index 847e3b2fa..2d6ecb631 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
@@ -58,20 +58,19 @@ public class TxnLogProposalIterator implements 
Iterator<Proposal> {
     @Override
     public Proposal next() {
 
-        Proposal p = new Proposal();
+        Proposal p;
         try {
             byte[] serializedData = Util.marshallTxnEntry(itr.getHeader(), 
itr.getTxn(), itr.getDigest());
 
             QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, 
itr.getHeader().getZxid(), serializedData, null);
-            p.packet = pp;
-            p.request = null;
-
+            p = new Proposal(pp);
             // This is the only place that can throw IO exception
             hasNext = itr.next();
 
         } catch (IOException e) {
             LOG.error("Unable to read txnlog from disk", e);
             hasNext = false;
+            p = new Proposal();
         }
 
         return p;
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
index eaad05cd2..d98c97f2c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
@@ -54,9 +54,8 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
 import org.apache.zookeeper.server.persistence.SnapStream;
 import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
-import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
-import org.apache.zookeeper.server.quorum.QuorumPacket;
+import org.apache.zookeeper.server.quorum.Leader.PureRequestProposal;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.TxnDigest;
@@ -323,19 +322,15 @@ public class ZKDatabase {
             wl.lock();
             if (committedLog.size() > commitLogCount) {
                 committedLog.remove();
-                minCommittedLog = committedLog.peek().packet.getZxid();
+                minCommittedLog = committedLog.peek().getZxid();
             }
             if (committedLog.isEmpty()) {
                 minCommittedLog = request.zxid;
                 maxCommittedLog = request.zxid;
             }
-            byte[] data = request.getSerializeData();
-            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, 
data, null);
-            Proposal p = new Proposal();
-            p.packet = pp;
-            p.request = request;
+            PureRequestProposal p = new PureRequestProposal(request);
             committedLog.add(p);
-            maxCommittedLog = p.packet.getZxid();
+            maxCommittedLog = p.getZxid();
         } finally {
             wl.unlock();
         }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index 3b9c827c3..0b57bb182 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -88,14 +88,60 @@ public class Leader extends LearnerMaster {
 
     public static class Proposal extends SyncedLearnerTracker {
 
-        public QuorumPacket packet;
-        public Request request;
+        private QuorumPacket packet;
+        protected Request request;
+
+        public Proposal() {
+        }
+
+        public Proposal(QuorumPacket packet) {
+            this.packet = packet;
+        }
+
+        public Proposal(Request request, QuorumPacket packet) {
+            this.request = request;
+            this.packet = packet;
+        }
+
+        public QuorumPacket getQuorumPacket() {
+            return packet;
+        }
+
+        public Request getRequest() {
+            return request;
+        }
+
+        public long getZxid() {
+            return packet.getZxid();
+        }
 
         @Override
         public String toString() {
             return packet.getType() + ", " + packet.getZxid() + ", " + request;
         }
+    }
 
+    public static class PureRequestProposal extends Proposal {
+
+        public PureRequestProposal(Request request) {
+            this.request = request;
+        }
+
+        @Override
+        public QuorumPacket getQuorumPacket() {
+            byte[] data = request.getSerializeData();
+            return new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
+        }
+
+        @Override
+        public long getZxid() {
+            return request.zxid;
+        }
+
+        @Override
+        public String toString() {
+            return request.toString();
+        }
     }
 
     // log ack latency if zxid is a multiple of ackLoggingFrequency. If <=0, 
disable logging.
@@ -1258,9 +1304,7 @@ public class Leader extends LearnerMaster {
         proposalStats.setLastBufferSize(data.length);
         QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, 
data, null);
 
-        Proposal p = new Proposal();
-        p.packet = pp;
-        p.request = request;
+        Proposal p = new Proposal(request, pp);
 
         synchronized (this) {
             p.addQuorumVerifier(self.getQuorumVerifier());
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index e9d5cd4e5..049336a16 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -957,7 +957,7 @@ public class LearnerHandler extends ZooKeeperThread {
         while (itr.hasNext()) {
             Proposal propose = itr.next();
 
-            long packetZxid = propose.packet.getZxid();
+            long packetZxid = propose.getZxid();
             // abort if we hit the limit
             if ((maxZxid != null) && (packetZxid > maxZxid)) {
                 break;
@@ -1020,7 +1020,7 @@ public class LearnerHandler extends ZooKeeperThread {
 
             // Since this is already a committed proposal, we need to follow
             // it by a commit packet
-            queuePacket(propose.packet);
+            queuePacket(propose.getQuorumPacket());
             queueOpPacket(Leader.COMMIT, packetZxid);
             queuedZxid = packetZxid;
 
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
index 01f3a8240..b3e7fa249 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
@@ -123,18 +123,18 @@ public class QuorumOracleMaj extends QuorumMaj {
         LOG.debug("Start Revalidation outstandingProposals");
         try {
             while (outstandingProposal.size() >= 1) {
-                outstandingProposal.sort((o1, o2) -> (int) 
(o1.packet.getZxid() - o2.packet.getZxid()));
+                outstandingProposal.sort((o1, o2) -> (int) (o1.getZxid() - 
o2.getZxid()));
 
                 Leader.Proposal p;
                 int i = 0;
                 while (i < outstandingProposal.size()) {
                     p = outstandingProposal.get(i);
-                    if (p.request.zxid > lastCommitted) {
-                        LOG.debug("Re-validate outstanding proposal: 0x{} 
size:{} lastCommitted:{}", Long.toHexString(p.request.zxid), 
outstandingProposal.size(), Long.toHexString(lastCommitted));
-                        if (!self.tryToCommit(p, p.request.zxid, null)) {
+                    if (p.getZxid() > lastCommitted) {
+                        LOG.debug("Re-validate outstanding proposal: 0x{} 
size:{} lastCommitted:{}", Long.toHexString(p.getZxid()), 
outstandingProposal.size(), Long.toHexString(lastCommitted));
+                        if (!self.tryToCommit(p, p.getZxid(), null)) {
                             break;
                         } else {
-                            lastCommitted = p.request.zxid;
+                            lastCommitted = p.getZxid();
                             outstandingProposal.remove(p);
                         }
                     }
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
index 7ac563698..8e39f61e9 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import java.io.File;
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.Set;
@@ -144,7 +145,9 @@ public class LeaderWithObserverTest {
         long zxid = leader.zk.getZxid();
 
         // things needed for waitForNewLeaderAck to run (usually in 
leader.lead(), but we're not running leader here)
-        leader.newLeaderProposal.packet = new QuorumPacket(0, zxid, null, 
null);
+        Field field = Leader.Proposal.class.getDeclaredField("packet");
+        field.setAccessible(true);
+        field.set(leader.newLeaderProposal, new QuorumPacket(0, zxid, null, 
null));
         leader.newLeaderProposal.addQuorumVerifier(peer.getQuorumVerifier());
 
         Set<Long> ackSet = 
leader.newLeaderProposal.qvAcksetPairs.get(0).getAckset();
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
index bbf36367e..43202716d 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
@@ -86,14 +86,14 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         public long getmaxCommittedLog() {
             if (!committedLog.isEmpty()) {
-                return committedLog.getLast().packet.getZxid();
+                return committedLog.getLast().getZxid();
             }
             return 0;
         }
 
         public long getminCommittedLog() {
             if (!committedLog.isEmpty()) {
-                return committedLog.getFirst().packet.getZxid();
+                return committedLog.getFirst().getZxid();
             }
             return 0;
         }
@@ -107,7 +107,7 @@ public class LearnerHandlerTest extends ZKTestCase {
         }
 
         public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid, long 
limit) {
-            if (peerZxid >= txnLog.peekFirst().packet.getZxid()) {
+            if (peerZxid >= txnLog.peekFirst().getZxid()) {
                 return txnLog.iterator();
             } else {
                 return Collections.emptyIterator();
@@ -150,10 +150,10 @@ public class LearnerHandlerTest extends ZKTestCase {
     }
 
     Proposal createProposal(long zxid) {
-        Proposal p = new Proposal();
-        p.packet = new QuorumPacket();
-        p.packet.setZxid(zxid);
-        p.packet.setType(Leader.PROPOSAL);
+        QuorumPacket packet = new QuorumPacket();
+        packet.setZxid(zxid);
+        packet.setType(Leader.PROPOSAL);
+        Proposal p = new Proposal(packet);
         return p;
     }
 
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
index 010d69b33..a85e76d01 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
@@ -107,7 +107,7 @@ public class GetProposalFromTxnTest extends ZKTestCase {
         while (itr.hasNext()) {
             Proposal proposal = itr.next();
             TxnLogEntry logEntry = SerializeUtils.deserializeTxn(
-                    proposal.packet.getData());
+                    proposal.getQuorumPacket().getData());
             TxnHeader hdr = logEntry.getHeader();
             Record rec = logEntry.getTxn();
             if (hdr.getType() == OpCode.create) {
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java
index 1c1c72e1a..16a470c31 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java
@@ -82,8 +82,8 @@ public class LocalSessionRequestTest extends ZKTestCase {
         QuorumPeer peer = qb.getPeerList().get(peerId);
         ZKDatabase db = peer.getActiveServer().getZKDatabase();
         for (Proposal p : db.getCommittedLog()) {
-            assertFalse(p.request.sessionId == sessionId,
-                    "Should not see " + Request.op2String(p.request.type)
+            assertFalse(p.getRequest().sessionId == sessionId,
+                    "Should not see " + Request.op2String(p.getRequest().type)
                             + " request from local session 0x" + session + " 
on the " + peerType);
         }
     }

Reply via email to