This is an automated email from the ASF dual-hosted git repository. andor pushed a commit to branch branch-3.9 in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.9 by this push: new 2bd1bb7e5 ZOOKEEPER-4794: Reduce the ZKDatabase#committedLog memory usage. 2bd1bb7e5 is described below commit 2bd1bb7e5d0828d03ddb2cd0d7444530ea10ad88 Author: Yan Zhao <horizo...@apache.org> AuthorDate: Sat Feb 10 00:32:53 2024 +0800 ZOOKEEPER-4794: Reduce the ZKDatabase#committedLog memory usage. Reduce the committed log memory usage. Fix ci. Reviewers: eolivelli, hangc0276, anmolnar Author: horizonzy Closes #2115 from horizonzy/reduce-committed-log-memory-usage (cherry picked from commit 18c78cd10bc02d764a46ac1659b263cf69f2671d) Signed-off-by: Andor Molnar <an...@apache.org> --- .../java/org/apache/zookeeper/server/Request.java | 19 ++------ .../zookeeper/server/TxnLogProposalIterator.java | 7 ++- .../org/apache/zookeeper/server/ZKDatabase.java | 13 ++---- .../org/apache/zookeeper/server/quorum/Leader.java | 54 ++++++++++++++++++++-- .../zookeeper/server/quorum/LearnerHandler.java | 4 +- .../server/quorum/flexible/QuorumOracleMaj.java | 10 ++-- .../server/quorum/LeaderWithObserverTest.java | 5 +- .../server/quorum/LearnerHandlerTest.java | 14 +++--- .../zookeeper/test/GetProposalFromTxnTest.java | 2 +- .../zookeeper/test/LocalSessionRequestTest.java | 4 +- 10 files changed, 82 insertions(+), 50 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java index 221f12d26..7f38405f2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java @@ -19,7 +19,6 @@ package org.apache.zookeeper.server; import static java.nio.charset.StandardCharsets.UTF_8; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -169,24 +168,16 @@ public class Request { && this.type != OpCode.createSession; } - private transient byte[] serializeData; - - @SuppressFBWarnings(value = "EI_EXPOSE_REP") public byte[] getSerializeData() { if (this.hdr == null) { return null; } - - if (this.serializeData == null) { - try { - this.serializeData = Util.marshallTxnEntry(this.hdr, this.txn, this.txnDigest); - } catch (IOException e) { - LOG.error("This really should be impossible.", e); - this.serializeData = new byte[32]; - } + try { + return Util.marshallTxnEntry(this.hdr, this.txn, this.txnDigest); + } catch (IOException e) { + LOG.error("This really should be impossible.", e); + return new byte[32]; } - - return this.serializeData; } /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java index 847e3b2fa..2d6ecb631 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java @@ -58,20 +58,19 @@ public class TxnLogProposalIterator implements Iterator<Proposal> { @Override public Proposal next() { - Proposal p = new Proposal(); + Proposal p; try { byte[] serializedData = Util.marshallTxnEntry(itr.getHeader(), itr.getTxn(), itr.getDigest()); QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, itr.getHeader().getZxid(), serializedData, null); - p.packet = pp; - p.request = null; - + p = new Proposal(pp); // This is the only place that can throw IO exception hasNext = itr.next(); } catch (IOException e) { LOG.error("Unable to read txnlog from disk", e); hasNext = false; + p = new Proposal(); } return p; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java index eaad05cd2..d98c97f2c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java @@ -54,9 +54,8 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener; import org.apache.zookeeper.server.persistence.SnapStream; import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator; -import org.apache.zookeeper.server.quorum.Leader; import org.apache.zookeeper.server.quorum.Leader.Proposal; -import org.apache.zookeeper.server.quorum.QuorumPacket; +import org.apache.zookeeper.server.quorum.Leader.PureRequestProposal; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.txn.TxnDigest; @@ -323,19 +322,15 @@ public class ZKDatabase { wl.lock(); if (committedLog.size() > commitLogCount) { committedLog.remove(); - minCommittedLog = committedLog.peek().packet.getZxid(); + minCommittedLog = committedLog.peek().getZxid(); } if (committedLog.isEmpty()) { minCommittedLog = request.zxid; maxCommittedLog = request.zxid; } - byte[] data = request.getSerializeData(); - QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); - Proposal p = new Proposal(); - p.packet = pp; - p.request = request; + PureRequestProposal p = new PureRequestProposal(request); committedLog.add(p); - maxCommittedLog = p.packet.getZxid(); + maxCommittedLog = p.getZxid(); } finally { wl.unlock(); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index 3b9c827c3..0b57bb182 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -88,14 +88,60 @@ public class Leader extends LearnerMaster { public static class Proposal extends SyncedLearnerTracker { - public QuorumPacket packet; - public Request request; + private QuorumPacket packet; + protected Request request; + + public Proposal() { + } + + public Proposal(QuorumPacket packet) { + this.packet = packet; + } + + public Proposal(Request request, QuorumPacket packet) { + this.request = request; + this.packet = packet; + } + + public QuorumPacket getQuorumPacket() { + return packet; + } + + public Request getRequest() { + return request; + } + + public long getZxid() { + return packet.getZxid(); + } @Override public String toString() { return packet.getType() + ", " + packet.getZxid() + ", " + request; } + } + public static class PureRequestProposal extends Proposal { + + public PureRequestProposal(Request request) { + this.request = request; + } + + @Override + public QuorumPacket getQuorumPacket() { + byte[] data = request.getSerializeData(); + return new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); + } + + @Override + public long getZxid() { + return request.zxid; + } + + @Override + public String toString() { + return request.toString(); + } } // log ack latency if zxid is a multiple of ackLoggingFrequency. If <=0, disable logging. @@ -1258,9 +1304,7 @@ public class Leader extends LearnerMaster { proposalStats.setLastBufferSize(data.length); QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); - Proposal p = new Proposal(); - p.packet = pp; - p.request = request; + Proposal p = new Proposal(request, pp); synchronized (this) { p.addQuorumVerifier(self.getQuorumVerifier()); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index e9d5cd4e5..049336a16 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -957,7 +957,7 @@ public class LearnerHandler extends ZooKeeperThread { while (itr.hasNext()) { Proposal propose = itr.next(); - long packetZxid = propose.packet.getZxid(); + long packetZxid = propose.getZxid(); // abort if we hit the limit if ((maxZxid != null) && (packetZxid > maxZxid)) { break; @@ -1020,7 +1020,7 @@ public class LearnerHandler extends ZooKeeperThread { // Since this is already a committed proposal, we need to follow // it by a commit packet - queuePacket(propose.packet); + queuePacket(propose.getQuorumPacket()); queueOpPacket(Leader.COMMIT, packetZxid); queuedZxid = packetZxid; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java index 01f3a8240..b3e7fa249 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java @@ -123,18 +123,18 @@ public class QuorumOracleMaj extends QuorumMaj { LOG.debug("Start Revalidation outstandingProposals"); try { while (outstandingProposal.size() >= 1) { - outstandingProposal.sort((o1, o2) -> (int) (o1.packet.getZxid() - o2.packet.getZxid())); + outstandingProposal.sort((o1, o2) -> (int) (o1.getZxid() - o2.getZxid())); Leader.Proposal p; int i = 0; while (i < outstandingProposal.size()) { p = outstandingProposal.get(i); - if (p.request.zxid > lastCommitted) { - LOG.debug("Re-validate outstanding proposal: 0x{} size:{} lastCommitted:{}", Long.toHexString(p.request.zxid), outstandingProposal.size(), Long.toHexString(lastCommitted)); - if (!self.tryToCommit(p, p.request.zxid, null)) { + if (p.getZxid() > lastCommitted) { + LOG.debug("Re-validate outstanding proposal: 0x{} size:{} lastCommitted:{}", Long.toHexString(p.getZxid()), outstandingProposal.size(), Long.toHexString(lastCommitted)); + if (!self.tryToCommit(p, p.getZxid(), null)) { break; } else { - lastCommitted = p.request.zxid; + lastCommitted = p.getZxid(); outstandingProposal.remove(p); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java index 7ac563698..8e39f61e9 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.io.File; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.Map; import java.util.Set; @@ -144,7 +145,9 @@ public class LeaderWithObserverTest { long zxid = leader.zk.getZxid(); // things needed for waitForNewLeaderAck to run (usually in leader.lead(), but we're not running leader here) - leader.newLeaderProposal.packet = new QuorumPacket(0, zxid, null, null); + Field field = Leader.Proposal.class.getDeclaredField("packet"); + field.setAccessible(true); + field.set(leader.newLeaderProposal, new QuorumPacket(0, zxid, null, null)); leader.newLeaderProposal.addQuorumVerifier(peer.getQuorumVerifier()); Set<Long> ackSet = leader.newLeaderProposal.qvAcksetPairs.get(0).getAckset(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java index bbf36367e..43202716d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java @@ -86,14 +86,14 @@ public class LearnerHandlerTest extends ZKTestCase { public long getmaxCommittedLog() { if (!committedLog.isEmpty()) { - return committedLog.getLast().packet.getZxid(); + return committedLog.getLast().getZxid(); } return 0; } public long getminCommittedLog() { if (!committedLog.isEmpty()) { - return committedLog.getFirst().packet.getZxid(); + return committedLog.getFirst().getZxid(); } return 0; } @@ -107,7 +107,7 @@ public class LearnerHandlerTest extends ZKTestCase { } public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid, long limit) { - if (peerZxid >= txnLog.peekFirst().packet.getZxid()) { + if (peerZxid >= txnLog.peekFirst().getZxid()) { return txnLog.iterator(); } else { return Collections.emptyIterator(); @@ -150,10 +150,10 @@ public class LearnerHandlerTest extends ZKTestCase { } Proposal createProposal(long zxid) { - Proposal p = new Proposal(); - p.packet = new QuorumPacket(); - p.packet.setZxid(zxid); - p.packet.setType(Leader.PROPOSAL); + QuorumPacket packet = new QuorumPacket(); + packet.setZxid(zxid); + packet.setType(Leader.PROPOSAL); + Proposal p = new Proposal(packet); return p; } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java index 010d69b33..a85e76d01 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java @@ -107,7 +107,7 @@ public class GetProposalFromTxnTest extends ZKTestCase { while (itr.hasNext()) { Proposal proposal = itr.next(); TxnLogEntry logEntry = SerializeUtils.deserializeTxn( - proposal.packet.getData()); + proposal.getQuorumPacket().getData()); TxnHeader hdr = logEntry.getHeader(); Record rec = logEntry.getTxn(); if (hdr.getType() == OpCode.create) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java index 1c1c72e1a..16a470c31 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java @@ -82,8 +82,8 @@ public class LocalSessionRequestTest extends ZKTestCase { QuorumPeer peer = qb.getPeerList().get(peerId); ZKDatabase db = peer.getActiveServer().getZKDatabase(); for (Proposal p : db.getCommittedLog()) { - assertFalse(p.request.sessionId == sessionId, - "Should not see " + Request.op2String(p.request.type) + assertFalse(p.getRequest().sessionId == sessionId, + "Should not see " + Request.op2String(p.getRequest().type) + " request from local session 0x" + session + " on the " + peerType); } }