jsancio commented on code in PR #21028:
URL: https://github.com/apache/kafka/pull/21028#discussion_r2921159062
##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -233,7 +234,11 @@ private void checkOffsetIsValid(long offset) {
private void maybeLoadLog() {
while (log.endOffset().offset() > nextOffset) {
- LogFetchInfo info = log.read(nextOffset, Isolation.UNCOMMITTED);
+ LogFetchInfo info = log.read(
+ nextOffset,
+ Isolation.UNCOMMITTED,
+ KafkaRaftClient.MAX_FETCH_SIZE_BYTES
Review Comment:
Hmm. I prefer if we just use Integer.MAX_VALUE or something like that. It
would allow us to eventually remove `KafkaRaftClient.MAX_FETCH_SIZE_BYTES`. Or
in other words can we remove ` KafkaRaftClient.MAX_FETCH_SIZE_BYTES` in this PR?
##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java:
##########
@@ -118,14 +119,19 @@ UnifiedLog log() {
}
@Override
- public LogFetchInfo read(long startOffset, Isolation readIsolation) {
+ public LogFetchInfo read(long startOffset, Isolation readIsolation, int
maxTotalBatchSizeBytes) {
FetchIsolation isolation = switch (readIsolation) {
case COMMITTED -> FetchIsolation.HIGH_WATERMARK;
case UNCOMMITTED -> FetchIsolation.LOG_END;
};
try {
- FetchDataInfo fetchInfo = log.read(startOffset,
config.internalMaxFetchSizeInBytes(), isolation, true);
+ FetchDataInfo fetchInfo = log.read(
+ startOffset,
+ maxTotalBatchSizeBytes,
+ isolation,
+ true
+ );
Review Comment:
In Java we indent 4 spaces.
```java
FetchDataInfo fetchInfo = log.read(
startOffset,
maxTotalBatchSizeBytes,
isolation,
true
);
```
This comment applies to a few places.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -177,6 +191,14 @@ public boolean autoJoin() {
return autoJoin;
}
+ public int fetchSnapshotMaxBytes() {
+ return fetchSnapshotSizeMaxBytes;
+ }
+
+ public int fetchMaxBytes() {
+ return fetchMaxSizeBytes;
+ }
+
Review Comment:
Make these names consistent with the configuration name used.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1023,6 +1023,63 @@ public void testFetchSnapshotRequestAsFollower(boolean
withKip853Rpc) throws IOE
assertEquals(leaderId, response.currentLeader().leaderId());
}
+ @ParameterizedTest
+ @ValueSource(booleans = { false, true })
+ public void testFetchSnapshotRequestWithPartialData(boolean withKip853Rpc)
throws Exception {
+ int localId = randomReplicaId();
+ Set<Integer> voters = Set.of(localId, localId + 1);
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
+ List<String> records = List.of("foo", "bar");
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .appendToLog(snapshotId.epoch(), List.of("a"))
+ .withFetchSnapshotMaxBytes(6)
+ .withKip853Rpc(withKip853Rpc)
+ .build();
+
+ context.unattachedToLeader();
+ int epoch = context.currentEpoch();
+
+ context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+ try (SnapshotWriter<String> snapshot =
context.client.createSnapshot(snapshotId, 0).get()) {
+ assertEquals(snapshotId, snapshot.snapshotId());
+ snapshot.append(records);
+ snapshot.freeze();
+ }
+
+ // Test that we will respond with at least 3 equally sized read of the
snapshot.
+ RawSnapshotReader snapshot =
context.log.readSnapshot(snapshotId).get();
+ int expectedNumberOfReads = 3;
Review Comment:
Why 3 reads? I would assume that the invariant is that every fetch snapshot
will return at most 6 bytes and that the sum of all of the bytes is equal to
the size of the snapshot.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -128,6 +138,8 @@ public class QuorumConfig {
private final int fetchTimeoutMs;
private final int appendLingerMs;
private final boolean autoJoin;
+ private final int fetchSnapshotSizeMaxBytes;
+ private final int fetchMaxSizeBytes;
Review Comment:
Please make the names consistent `fetchSnapshotSizeMaxBytes` vs
`fetchMaxSizeBytes`. One has `SizeMax` while the other has `MaxSize` based on
the configuration name they should be `fetchSnapshotMaxBytes` and
`fetchMaxBytes`.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1023,6 +1023,63 @@ public void testFetchSnapshotRequestAsFollower(boolean
withKip853Rpc) throws IOE
assertEquals(leaderId, response.currentLeader().leaderId());
}
+ @ParameterizedTest
+ @ValueSource(booleans = { false, true })
+ public void testFetchSnapshotRequestWithPartialData(boolean withKip853Rpc)
throws Exception {
+ int localId = randomReplicaId();
+ Set<Integer> voters = Set.of(localId, localId + 1);
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
+ List<String> records = List.of("foo", "bar");
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .appendToLog(snapshotId.epoch(), List.of("a"))
+ .withFetchSnapshotMaxBytes(6)
Review Comment:
Make 6 a variable so you can use it in the tests below.
##########
raft/src/main/java/org/apache/kafka/raft/RaftLog.java:
##########
@@ -58,8 +58,15 @@ public interface RaftLog extends AutoCloseable {
/**
* Read a set of records within a range of offsets.
+ *
+ * @param startOffsetInclusive Records later than this offset will be
returned.
+ * @param isolation Whether to read committed (up to high watermark) or
uncommitted data.
+ * @param maxTotalBatchSizeBytes Soft max for number of bytes to retrieve.
Will stop returning batches once the
+ * size of previously returned batches
exceeds maxTotalBatchSizeBytes
Review Comment:
This is not accurate. Just translate the description from `UnifiedLog#read`:
```
* @param maxLength The maximum number of bytes to read
* @param isolation The fetch isolation, which controls the maximum
offset we are allowed to read
* @param minOneMessage If this is true, the first message will be
returned even if it exceeds `maxLength` (if one exists)
```
##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java:
##########
@@ -359,7 +365,12 @@ public Optional<RawSnapshotWriter>
createNewSnapshot(OffsetAndEpoch snapshotId)
fetches from this offset, the returned batch will start at offset (X
- M), and the
follower will be unable to append it since (X - M) < (X).
*/
- long baseOffset = read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata.offset();
+ long baseOffset = read(
+ snapshotId.offset(),
+ Isolation.COMMITTED,
+ KafkaRaftClient.MAX_FETCH_SIZE_BYTES
Review Comment:
Okay but why not ready 0 or 1 byte? Read will guarantee to read at least one
batch if one batch exists?
##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -487,7 +503,9 @@ public RaftClientTestContext build() throws IOException {
canBecomeVoter,
metrics,
externalKRaftMetrics,
- listener
+ listener,
+ fetchSnapshotMaxBytes,
+ fetchMaxBytes
Review Comment:
inconsistent indentation.
##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -535,6 +555,8 @@ private RaftClientTestContext(
this.metrics = metrics;
this.externalKRaftMetrics = externalKRaftMetrics;
this.listener = listener;
+ this.fetchSnapshotMaxBytes = fetchSnapshotMaxBytes;
Review Comment:
This variable is not used in this file. Why do you need it? Or are you
missing a check for fetch snapshot requests?
##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -108,6 +108,14 @@ public class QuorumConfig {
"join the cluster metadata partition for its cluster id.";
public static final boolean DEFAULT_QUORUM_AUTO_JOIN_ENABLE = false;
+ public static final String QUORUM_FETCH_SNAPSHOT_MAX_BYTES_CONFIG =
QUORUM_PREFIX + "fetch.snapshot.max.bytes";
+ public static final String QUORUM_FETCH_SNAPSHOT_MAX_BYTES_DOC = "Maximum
amount of data to retrieve for each FetchSnapshot request to the controller.";
+ public static final int DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES = 1048576;
+
+ public static final String QUORUM_FETCH_MAX_BYTES_CONFIG = QUORUM_PREFIX +
"fetch.max.bytes";
+ public static final String QUORUM_FETCH_MAX_BYTES_DOC = "Maximum amount of
data to retrieve for each Fetch request. Always returns at least one batch even
if it is greater than QUORUM_FETCH_MAX_BYTES_CONFIG.";
Review Comment:
Is this comment resolved?
##########
raft/src/test/java/org/apache/kafka/raft/MockLog.java:
##########
@@ -428,7 +428,8 @@ public LogFetchInfo read(long startOffset, Isolation
isolation) {
startOffset, metadataForOffset(startOffset)));
}
- ByteBuffer buffer = ByteBuffer.allocate(512);
+ int bufferSizeBytes = 512;
+ ByteBuffer buffer = ByteBuffer.allocate(bufferSizeBytes);
Review Comment:
This is not correct now that max total is configurable, no? It may be
easiest to use `ByteBufferOutputStream` and check the size after every batch
write. Truncating to `max(maxTotalBatchSizeBytes, firstBatchSize)`.
This will simplify the changes to the if statement you made below.
Let's also write test cases for this new functionality.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]