[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r624230147 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -966,6 +967,54 @@ public void verify() { } } +private static class SnapshotAtLogStart implements Invariant { Review comment: Okay. The simulations now take less than a minute in total on my machine which is what I see on trunk in my machine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r624200356 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -966,6 +967,54 @@ public void verify() { } } +private static class SnapshotAtLogStart implements Invariant { Review comment: Good catch. I made some improvements After the improvements the simulation time went from 1 minute in trunk to a 1:41 minutes for this PR. It looks like a good amount of that time is spent in MockLog.validateOffsetAndEpoch. Let me see what we can do but we can also remove that functionality from the verification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r624200356 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -966,6 +967,54 @@ public void verify() { } } +private static class SnapshotAtLogStart implements Invariant { Review comment: Good catch. I made some improvements After the improvements the simulation time went from 1 minute in trunk to a 1:41 minutes for this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r624166487 ## File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java ## @@ -75,56 +75,6 @@ public void testTopicId() { assertEquals(topicId, log.topicId()); } -@Test -public void testAppendAsLeaderHelper() { Review comment: I actually, move this to `testAppendAsLeader`. The two tests were testing almost the same things. I just merged them so that one test now covers all of the cases that didn't overlap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r624166487 ## File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java ## @@ -75,56 +75,6 @@ public void testTopicId() { assertEquals(topicId, log.topicId()); } -@Test -public void testAppendAsLeaderHelper() { Review comment: I actually, move this to `testAppendAsLeader`. The two tests were testing also the same things. I just merged them so that one test now covers all of the cases that didn't overlap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r623370656 ## File path: raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + + Review comment: Lines removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r623370427 ## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ## @@ -426,48 +422,55 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) { public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {} @Override -public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) { -if (logStartOffset() > logStartSnapshotId.offset || -highWatermark.offset < logStartSnapshotId.offset) { - +public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) { +if (logStartOffset() > snapshotId.offset) { +throw new OffsetOutOfRangeException( +String.format( +"New log start (%s) is less than the curent log start offset (%s)", +snapshotId, +logStartOffset() +) +); +} +if (highWatermark.offset < snapshotId.offset) { throw new OffsetOutOfRangeException( String.format( -"New log start (%s) is less than start offset (%s) or is greater than the high watermark (%s)", -logStartSnapshotId, -logStartOffset(), +"New log start (%s) is greater than the high watermark (%s)", +snapshotId, highWatermark.offset ) ); } boolean updated = false; -Optional snapshotIdOpt = latestSnapshotId(); -if (snapshotIdOpt.isPresent()) { -OffsetAndEpoch snapshotId = snapshotIdOpt.get(); -if (startOffset() < logStartSnapshotId.offset && -highWatermark.offset >= logStartSnapshotId.offset && -snapshotId.offset >= logStartSnapshotId.offset) { +if (snapshots.containsKey(snapshotId)) { +snapshots.headMap(snapshotId, false).clear(); -snapshots.headMap(logStartSnapshotId, false).clear(); +// Update the high watermark if it is less than the new log start offset +if (snapshotId.offset > highWatermark.offset) { Review comment: You are correct. This not needed. ## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ## @@ -184,6 +187,22 @@ Builder appendToLog(int epoch, List records) { return this; } +Builder withSnapshot(OffsetAndEpoch snapshotId) throws IOException { Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r623370233 ## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ## @@ -363,33 +362,30 @@ public LogFetchInfo read(long startOffset, Isolation isolation) { } ByteBuffer buffer = ByteBuffer.allocate(512); -LogEntry firstEntry = null; +LogOffsetMetadata batchStartOffset = null; for (LogBatch batch : batches) { // Note that start offset is inclusive while max offset is exclusive. We only return // complete batches, so batches which end at an offset larger than the max offset are // filtered, which is effectively the same as having the consumer drop an incomplete // batch returned in a fetch response. -if (batch.lastOffset() >= startOffset) { -if (batch.lastOffset() < maxOffset) { -buffer = batch.writeTo(buffer); -} +if (batch.lastOffset() >= startOffset && batch.lastOffset() < maxOffset && !batch.entries.isEmpty()) { Review comment: I wanted to keep the tests reproducible. As a comprise I changed this code to read at most 2 batches. We can revisit this in the future and make it random but reproducible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r623369093 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -717,7 +720,8 @@ void start(int nodeId) { persistentState.store, logContext, time, -random +random, +serde Review comment: Yes. Good idea. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r623368881 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ## @@ -1081,24 +1103,25 @@ OptionalInt currentClaimedEpoch() { .orElse(null); } -@Override -public void handleClaim(int epoch) { -// We record the next expected offset as the claimed epoch's start -// offset. This is useful to verify that the `handleClaim` callback -// was not received early. -long claimedEpochStartOffset = lastCommitOffset().isPresent() ? -lastCommitOffset().getAsLong() + 1 : 0L; -this.currentClaimedEpoch = OptionalInt.of(epoch); -this.claimedEpochStartOffsets.put(epoch, claimedEpochStartOffset); +Optional> takeSnapshot() { Review comment: Yep. Changed the name as suggested. The name was inspired that a naming pattern used in Rust (https://doc.rust-lang.org/std/option/enum.Option.html#method.take) but I see that we use `takeSnapshot` in other parts of Kafka for creating snapshots. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r623367561 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.BatchReader.Batch; +import org.apache.kafka.raft.RecordSerde; + +public final class RecordsIterator implements Iterator>, AutoCloseable { +private final Records records; +private final RecordSerde serde; +private final BufferSupplier bufferSupplier; +private final int batchSize; + +private Iterator nextBatches = Collections.emptyIterator(); +private Optional> nextBatch = Optional.empty(); +// Buffer used as the backing store for nextBatches if needed +private Optional allocatedBuffer = Optional.empty(); +// Number of bytes from records read up to now +private int bytesRead = 0; +private boolean isClosed = false; + +public RecordsIterator( +Records records, +RecordSerde serde, +BufferSupplier bufferSupplier, +int batchSize +) { +this.records = records; +this.serde = serde; +this.bufferSupplier = bufferSupplier; +this.batchSize = Math.max(batchSize, Records.HEADER_SIZE_UP_TO_MAGIC); +} + +@Override +public boolean hasNext() { +ensureOpen(); + +if (!nextBatch.isPresent()) { +nextBatch = nextBatch(); +} + +return nextBatch.isPresent(); +} + +@Override +public Batch next() { +if (!hasNext()) { +throw new NoSuchElementException("Batch iterator doesn't have any more elements"); +} + +Batch batch = nextBatch.get(); +nextBatch = Optional.empty(); + +return batch; +} + +@Override +public void close() { +isClosed = true; +allocatedBuffer.ifPresent(bufferSupplier::release); +allocatedBuffer = Optional.empty(); +} + +private void ensureOpen() { +if (isClosed) { +throw new IllegalStateException("Serde record batch itererator was closed"); +} +} + +private MemoryRecords readFileRecords(FileRecords fileRecords, ByteBuffer buffer) { +int start = buffer.position(); +try { +fileRecords.readInto(buffer, bytesRead); +} catch (IOException e) { +throw new RuntimeException("Failed to read records into memory", e); +} + +bytesRead += buffer.limit() - start; +return MemoryRecords.readableRecords(buffer.slice()); +} + +private MemoryRecords createMemoryRecords(FileRecords fileRecords) { +final ByteBuffer buffer; +if (allocatedBuffer.isPresent()) { +buffer = allocatedBuffer.get(); +buffer.compact(); +} else { +buffer = bufferSupplier.get(Math.min(batchSize, records.sizeInBytes())); +allocatedBuffer = Optional.of(buffer); +} + +MemoryRecords memoryRecords = readFileRecords(fileRecords, buffer); + +if (memoryRecords.firstBatchSize() < buffer.remaining()) { Review comment: Yes. Added a comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r623367352 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -311,8 +311,18 @@ private void updateListenersProgress(long highWatermark) { private void updateListenersProgress(List listenerContexts, long highWatermark) { for (ListenerContext listenerContext : listenerContexts) { listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> { -if (nextExpectedOffset < log.startOffset()) { -listenerContext.fireHandleSnapshot(log.startOffset()); +if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) { +SnapshotReader snapshot = latestSnapshot().orElseThrow(() -> { +return new IllegalStateException( +String.format( +"Snapshot expected when next offset is %s, log start offset is %s and high-watermark is %s", Review comment: Good idea. I decided to use `getClass().getTypeName()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r623367035 ## File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java ## @@ -60,19 +61,21 @@ @Override void close(); -class Batch { +final class Batch implements Iterable { private final long baseOffset; private final int epoch; +private final long lastOffset; private final List records; -public Batch(long baseOffset, int epoch, List records) { +private Batch(long baseOffset, int epoch, long lastOffset, List records) { Review comment: Yes. I documented the static methods for creating objects of this type. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r623366415 ## File path: core/src/main/scala/kafka/tools/TestRaftServer.scala ## @@ -226,7 +232,11 @@ class TestRaftServer( reader.close() } -case _ => +case HandleSnapshot(reader) => + // Ignore snapshots; only interested on records appended by this leader Review comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610995466 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2154,8 +2182,14 @@ private boolean maybeCompleteShutdown(long currentTimeMs) { return false; } -private void maybeUpdateOldestSnapshotId() { -log.latestSnapshotId().ifPresent(log::deleteBeforeSnapshot); +private void maybeUpdateEarliestSnapshotId() { Review comment: I changed the method name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610995406 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -326,6 +336,14 @@ private void updateListenersProgress(List listenerContexts, lon } } +private Optional> latestSnapshot() { +return log.latestSnapshotId().flatMap(snapshoId -> { +return log Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610995402 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -326,6 +336,14 @@ private void updateListenersProgress(List listenerContexts, lon } } +private Optional> latestSnapshot() { +return log.latestSnapshotId().flatMap(snapshoId -> { Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610995361 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -68,20 +70,65 @@ public synchronized void increment() { @Override public synchronized void handleCommit(BatchReader reader) { try { -int initialValue = this.committed; +int initialValue = committed; while (reader.hasNext()) { BatchReader.Batch batch = reader.next(); log.debug("Handle commit of batch with records {} at base offset {}", batch.records(), batch.baseOffset()); for (Integer value : batch.records()) { -if (value != this.committed + 1) { -throw new AssertionError("Expected next committed value to be " + -(this.committed + 1) + ", but instead found " + value + " on node " + nodeId); +if (value != committed + 1) { Review comment: Done. Not sure if it is any better :smile: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610995276 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -68,20 +70,65 @@ public synchronized void increment() { @Override public synchronized void handleCommit(BatchReader reader) { try { -int initialValue = this.committed; +int initialValue = committed; while (reader.hasNext()) { BatchReader.Batch batch = reader.next(); log.debug("Handle commit of batch with records {} at base offset {}", batch.records(), batch.baseOffset()); for (Integer value : batch.records()) { -if (value != this.committed + 1) { -throw new AssertionError("Expected next committed value to be " + -(this.committed + 1) + ", but instead found " + value + " on node " + nodeId); +if (value != committed + 1) { +throw new AssertionError( +String.format( +"Expected next committed value to be %s, but instead found %s on node %s", +committed + 1, +value, +nodeId +) +); } -this.committed = value; +committed = value; } + +nextReadOffset = batch.lastOffset() + 1; +readEpoch = batch.epoch(); } log.debug("Counter incremented from {} to {}", initialValue, committed); + +if (lastSnapshotEndOffset + 10 < nextReadOffset) { Review comment: Made it a variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610995276 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -68,20 +70,65 @@ public synchronized void increment() { @Override public synchronized void handleCommit(BatchReader reader) { try { -int initialValue = this.committed; +int initialValue = committed; while (reader.hasNext()) { BatchReader.Batch batch = reader.next(); log.debug("Handle commit of batch with records {} at base offset {}", batch.records(), batch.baseOffset()); for (Integer value : batch.records()) { -if (value != this.committed + 1) { -throw new AssertionError("Expected next committed value to be " + -(this.committed + 1) + ", but instead found " + value + " on node " + nodeId); +if (value != committed + 1) { +throw new AssertionError( +String.format( +"Expected next committed value to be %s, but instead found %s on node %s", +committed + 1, +value, +nodeId +) +); } -this.committed = value; +committed = value; } + +nextReadOffset = batch.lastOffset() + 1; +readEpoch = batch.epoch(); } log.debug("Counter incremented from {} to {}", initialValue, committed); + +if (lastSnapshotEndOffset + 10 < nextReadOffset) { Review comment: Made it configurable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610995124 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -355,24 +373,29 @@ private void fireHandleResign(int epoch) { } @Override -public void initialize() throws IOException { -quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); +public void initialize() { +try { +quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); -long currentTimeMs = time.milliseconds(); -if (quorum.isLeader()) { -throw new IllegalStateException("Voter cannot initialize as a Leader"); -} else if (quorum.isCandidate()) { -onBecomeCandidate(currentTimeMs); -} else if (quorum.isFollower()) { -onBecomeFollower(currentTimeMs); -} +long currentTimeMs = time.milliseconds(); +if (quorum.isLeader()) { +throw new IllegalStateException("Voter cannot initialize as a Leader"); +} else if (quorum.isCandidate()) { +onBecomeCandidate(currentTimeMs); +} else if (quorum.isFollower()) { +onBecomeFollower(currentTimeMs); +} -// When there is only a single voter, become candidate immediately -if (quorum.isVoter() -&& quorum.remoteVoters().isEmpty() -&& !quorum.isLeader() -&& !quorum.isCandidate()) { -transitionToCandidate(currentTimeMs); +// When there is only a single voter, become candidate immediately +if (quorum.isVoter() +&& quorum.remoteVoters().isEmpty() +&& !quorum.isLeader() Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610995053 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java ## @@ -163,48 +85,46 @@ public OptionalLong lastOffset() { @Override public void close() { -isClosed = true; +if (!isClosed) { +isClosed = true; -if (allocatedBuffer != null) { -bufferSupplier.release(allocatedBuffer); +iterator.close(); +closeListener.onClose(this); } - -closeListener.onClose(this); } -public T readRecord(Readable input) { -// Read size of body in bytes -input.readVarint(); - -// Read unused attributes -input.readByte(); - -long timestampDelta = input.readVarlong(); -if (timestampDelta != 0) { -throw new IllegalArgumentException(); -} - -// Read offset delta -input.readVarint(); - -int keySize = input.readVarint(); -if (keySize != -1) { -throw new IllegalArgumentException("Unexpected key size " + keySize); -} +public static RecordsBatchReader of( +long baseOffset, +Records records, +RecordSerde serde, +BufferSupplier bufferSupplier, +int maxBatchSize, +CloseListener> closeListener +) { +return new RecordsBatchReader<>( +baseOffset, +new SerdeRecordsIterator<>(records, serde, bufferSupplier, maxBatchSize), +closeListener +); +} -int valueSize = input.readVarint(); -if (valueSize < 0) { -throw new IllegalArgumentException(); +private void checkIfClosed() { Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610955955 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.BatchReader.Batch; +import org.apache.kafka.raft.RecordSerde; + +public final class SerdeRecordsIterator implements Iterator>, AutoCloseable { +private final Records records; +private final RecordSerde serde; +private final BufferSupplier bufferSupplier; +private final int maxBatchSize; + +private Optional> nextBatches = Optional.empty(); +private Optional> nextBatch = Optional.empty(); +// Buffer used to as the backing store for nextBatches if needed +private Optional allocatedBuffer = Optional.empty(); +// Number of bytes from records that read +private int bytesRead = 0; +private boolean isClosed = false; + +public SerdeRecordsIterator( +Records records, +RecordSerde serde, +BufferSupplier bufferSupplier, +int maxBatchSize +) { +this.records = records; +this.serde = serde; +this.bufferSupplier = bufferSupplier; +this.maxBatchSize = maxBatchSize; +} + +@Override +public boolean hasNext() { +checkIfClosed(); + +if (!nextBatch.isPresent()) { +nextBatch = nextBatch(); +} + +return nextBatch.isPresent(); +} + +@Override +public Batch next() { +if (!hasNext()) { +throw new NoSuchElementException("Batch iterator doesn't have any more elements"); +} + +Batch batch = nextBatch.get(); +nextBatch = Optional.empty(); + +return batch; +} + +@Override +public void close() { +isClosed = true; +allocatedBuffer.ifPresent(bufferSupplier::release); +allocatedBuffer = Optional.empty(); +} + +private void checkIfClosed() { +if (isClosed) { +throw new IllegalStateException("Serde record batch itererator was closed"); +} +} + +private Optional> nextBatches() { +int recordSize = records.sizeInBytes(); +if (bytesRead < recordSize) { +final MemoryRecords memoryRecords; +if (records instanceof MemoryRecords) { +bytesRead = recordSize; +memoryRecords = (MemoryRecords) records; +} else if (records instanceof FileRecords) { +final ByteBuffer buffer; +if (allocatedBuffer.isPresent()) { +buffer = allocatedBuffer.get(); +buffer.compact(); + +if (!buffer.hasRemaining()) { +// The buffer is not big enough to read an entire batch +throw new IllegalStateException( +String.format( +"Unable to read batch from file records buffer %s with maximum batch %s and record size %s", +buffer, +maxBatchSize, +records.sizeInBytes() +) +); +} Review comment: Yeah, got it. I thought about it some more. This is currently only used for the metadata partitions. If the controller or broker cannot read the metadata partition the
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610952440 ## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ## @@ -418,48 +414,49 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) { public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {} @Override -public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) { -if (logStartOffset() > logStartSnapshotId.offset || -highWatermark.offset < logStartSnapshotId.offset) { +public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) { +if (logStartOffset() > snapshotId.offset || +highWatermark.offset < snapshotId.offset) { throw new OffsetOutOfRangeException( String.format( "New log start (%s) is less than start offset (%s) or is greater than the high watermark (%s)", Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610951347 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.BatchReader.Batch; +import org.apache.kafka.raft.RecordSerde; + +public final class SerdeRecordsIterator implements Iterator>, AutoCloseable { +private final Records records; +private final RecordSerde serde; +private final BufferSupplier bufferSupplier; +private final int maxBatchSize; + +private Optional> nextBatches = Optional.empty(); +private Optional> nextBatch = Optional.empty(); +// Buffer used to as the backing store for nextBatches if needed +private Optional allocatedBuffer = Optional.empty(); +// Number of bytes from records that read +private int bytesRead = 0; +private boolean isClosed = false; + +public SerdeRecordsIterator( +Records records, +RecordSerde serde, +BufferSupplier bufferSupplier, +int maxBatchSize +) { +this.records = records; +this.serde = serde; +this.bufferSupplier = bufferSupplier; +this.maxBatchSize = maxBatchSize; +} + +@Override +public boolean hasNext() { +checkIfClosed(); + +if (!nextBatch.isPresent()) { +nextBatch = nextBatch(); +} + +return nextBatch.isPresent(); +} + +@Override +public Batch next() { +if (!hasNext()) { +throw new NoSuchElementException("Batch iterator doesn't have any more elements"); +} + +Batch batch = nextBatch.get(); +nextBatch = Optional.empty(); + +return batch; +} + +@Override +public void close() { +isClosed = true; +allocatedBuffer.ifPresent(bufferSupplier::release); +allocatedBuffer = Optional.empty(); +} + +private void checkIfClosed() { +if (isClosed) { +throw new IllegalStateException("Serde record batch itererator was closed"); +} +} + +private Optional> nextBatches() { +int recordSize = records.sizeInBytes(); +if (bytesRead < recordSize) { +final MemoryRecords memoryRecords; +if (records instanceof MemoryRecords) { +bytesRead = recordSize; +memoryRecords = (MemoryRecords) records; +} else if (records instanceof FileRecords) { +final ByteBuffer buffer; +if (allocatedBuffer.isPresent()) { +buffer = allocatedBuffer.get(); +buffer.compact(); + +if (!buffer.hasRemaining()) { +// The buffer is not big enough to read an entire batch +throw new IllegalStateException( +String.format( +"Unable to read batch from file records buffer %s with maximum batch %s and record size %s", +buffer, +maxBatchSize, +records.sizeInBytes() +) +); +} +} else { +buffer = bufferSupplier.get(Math.min(maxBatchSize, records.sizeInBytes())); +allocatedBuffer = Optional.of(buffer); +
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610949543 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.BatchReader.Batch; +import org.apache.kafka.raft.RecordSerde; + +public final class SerdeRecordsIterator implements Iterator>, AutoCloseable { Review comment: By the way Kafka already has `org.apache.kafka.common.record.RecordBatchIterator`. To me, the name of the iterator should represent what it iterates over not what it generates. In this case it is iterating over `Records` (`MemoryRecords` and `FileRecords`). When we implement @mumrah's suggestion of having separate implements for each, we will have `MemoryRecordsIterator` and `FileRecordsIterator`. I'll change the name to `RecordsIterator`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610944539 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -68,20 +70,65 @@ public synchronized void increment() { @Override public synchronized void handleCommit(BatchReader reader) { try { -int initialValue = this.committed; +int initialValue = committed; while (reader.hasNext()) { BatchReader.Batch batch = reader.next(); log.debug("Handle commit of batch with records {} at base offset {}", batch.records(), batch.baseOffset()); for (Integer value : batch.records()) { -if (value != this.committed + 1) { -throw new AssertionError("Expected next committed value to be " + -(this.committed + 1) + ", but instead found " + value + " on node " + nodeId); +if (value != committed + 1) { +throw new AssertionError( +String.format( +"Expected next committed value to be %s, but instead found %s on node %s", +committed + 1, +value, +nodeId +) +); } -this.committed = value; +committed = value; } + +nextReadOffset = batch.lastOffset() + 1; +readEpoch = batch.epoch(); } log.debug("Counter incremented from {} to {}", initialValue, committed); + +if (lastSnapshotEndOffset + 10 < nextReadOffset) { Review comment: Hmm. We should only generate snapshots at batch boundaries. There is no guarantee that the `lastOffset` of a batch is a multiple of some number since batches can any number of records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610942230 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -68,20 +70,65 @@ public synchronized void increment() { @Override public synchronized void handleCommit(BatchReader reader) { try { -int initialValue = this.committed; +int initialValue = committed; while (reader.hasNext()) { BatchReader.Batch batch = reader.next(); log.debug("Handle commit of batch with records {} at base offset {}", batch.records(), batch.baseOffset()); for (Integer value : batch.records()) { -if (value != this.committed + 1) { -throw new AssertionError("Expected next committed value to be " + -(this.committed + 1) + ", but instead found " + value + " on node " + nodeId); +if (value != committed + 1) { +throw new AssertionError( +String.format( +"Expected next committed value to be %s, but instead found %s on node %s", +committed + 1, +value, +nodeId +) +); } -this.committed = value; +committed = value; } + +nextReadOffset = batch.lastOffset() + 1; Review comment: Yep. Not sure what I was thinking when I added those fields. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610939585 ## File path: raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java ## @@ -37,17 +36,21 @@ * * @throws IOException for any IO error while reading the size Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610939450 ## File path: raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java ## @@ -124,6 +125,11 @@ public void handleCommit(BatchReader reader) { } } +@Override +public void handleSnapshot(SnapshotReader reader) { +reader.close(); Review comment: In the future yes. Simply ignore the snapshot is okay for now for the following reasons: 1. We are getting rid of this shim in https://github.com/apache/kafka/pull/10497 2. None of the kraft listeners generate snapshots so this should never be called 3. We will implement snapshot loading for the controller and broker as part of this jira: https://issues.apache.org/jira/browse/KAFKA-12466 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610934045 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.BatchReader.Batch; +import org.apache.kafka.raft.RecordSerde; + +public final class SerdeRecordsIterator implements Iterator>, AutoCloseable { +private final Records records; +private final RecordSerde serde; +private final BufferSupplier bufferSupplier; +private final int maxBatchSize; + +private Optional> nextBatches = Optional.empty(); +private Optional> nextBatch = Optional.empty(); +// Buffer used to as the backing store for nextBatches if needed +private Optional allocatedBuffer = Optional.empty(); +// Number of bytes from records that read +private int bytesRead = 0; +private boolean isClosed = false; + +public SerdeRecordsIterator( +Records records, +RecordSerde serde, +BufferSupplier bufferSupplier, +int maxBatchSize +) { +this.records = records; +this.serde = serde; +this.bufferSupplier = bufferSupplier; +this.maxBatchSize = maxBatchSize; +} + +@Override +public boolean hasNext() { +checkIfClosed(); + +if (!nextBatch.isPresent()) { +nextBatch = nextBatch(); +} + +return nextBatch.isPresent(); +} + +@Override +public Batch next() { +if (!hasNext()) { +throw new NoSuchElementException("Batch iterator doesn't have any more elements"); +} + +Batch batch = nextBatch.get(); +nextBatch = Optional.empty(); + +return batch; +} + +@Override +public void close() { +isClosed = true; +allocatedBuffer.ifPresent(bufferSupplier::release); +allocatedBuffer = Optional.empty(); +} + +private void checkIfClosed() { +if (isClosed) { +throw new IllegalStateException("Serde record batch itererator was closed"); +} +} + +private Optional> nextBatches() { +int recordSize = records.sizeInBytes(); +if (bytesRead < recordSize) { +final MemoryRecords memoryRecords; +if (records instanceof MemoryRecords) { +bytesRead = recordSize; +memoryRecords = (MemoryRecords) records; +} else if (records instanceof FileRecords) { +final ByteBuffer buffer; +if (allocatedBuffer.isPresent()) { +buffer = allocatedBuffer.get(); +buffer.compact(); + +if (!buffer.hasRemaining()) { +// The buffer is not big enough to read an entire batch +throw new IllegalStateException( +String.format( +"Unable to read batch from file records buffer %s with maximum batch %s and record size %s", +buffer, +maxBatchSize, +records.sizeInBytes() +) +); +} +} else { +buffer = bufferSupplier.get(Math.min(maxBatchSize, records.sizeInBytes())); +allocatedBuffer = Optional.of(buffer); +
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r61098 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.BatchReader.Batch; +import org.apache.kafka.raft.RecordSerde; + +public final class SerdeRecordsIterator implements Iterator>, AutoCloseable { +private final Records records; +private final RecordSerde serde; +private final BufferSupplier bufferSupplier; +private final int maxBatchSize; + +private Optional> nextBatches = Optional.empty(); +private Optional> nextBatch = Optional.empty(); +// Buffer used to as the backing store for nextBatches if needed +private Optional allocatedBuffer = Optional.empty(); +// Number of bytes from records that read +private int bytesRead = 0; +private boolean isClosed = false; + +public SerdeRecordsIterator( +Records records, +RecordSerde serde, +BufferSupplier bufferSupplier, +int maxBatchSize +) { +this.records = records; +this.serde = serde; +this.bufferSupplier = bufferSupplier; +this.maxBatchSize = maxBatchSize; +} + +@Override +public boolean hasNext() { +checkIfClosed(); + +if (!nextBatch.isPresent()) { +nextBatch = nextBatch(); +} + +return nextBatch.isPresent(); +} + +@Override +public Batch next() { +if (!hasNext()) { +throw new NoSuchElementException("Batch iterator doesn't have any more elements"); +} + +Batch batch = nextBatch.get(); +nextBatch = Optional.empty(); + +return batch; +} + +@Override +public void close() { +isClosed = true; +allocatedBuffer.ifPresent(bufferSupplier::release); +allocatedBuffer = Optional.empty(); +} + +private void checkIfClosed() { +if (isClosed) { +throw new IllegalStateException("Serde record batch itererator was closed"); +} +} + +private Optional> nextBatches() { +int recordSize = records.sizeInBytes(); +if (bytesRead < recordSize) { +final MemoryRecords memoryRecords; +if (records instanceof MemoryRecords) { +bytesRead = recordSize; +memoryRecords = (MemoryRecords) records; +} else if (records instanceof FileRecords) { Review comment: I think most of the code will be the same but I can try to have to different implementations. Do you mind if I do that in a future PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r610933083 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.BatchReader.Batch; +import org.apache.kafka.raft.RecordSerde; + +public final class SerdeRecordsIterator implements Iterator>, AutoCloseable { +private final Records records; +private final RecordSerde serde; +private final BufferSupplier bufferSupplier; +private final int maxBatchSize; + +private Optional> nextBatches = Optional.empty(); +private Optional> nextBatch = Optional.empty(); +// Buffer used to as the backing store for nextBatches if needed +private Optional allocatedBuffer = Optional.empty(); +// Number of bytes from records that read +private int bytesRead = 0; +private boolean isClosed = false; + +public SerdeRecordsIterator( +Records records, +RecordSerde serde, +BufferSupplier bufferSupplier, +int maxBatchSize +) { +this.records = records; +this.serde = serde; +this.bufferSupplier = bufferSupplier; +this.maxBatchSize = maxBatchSize; +} + +@Override +public boolean hasNext() { +checkIfClosed(); + +if (!nextBatch.isPresent()) { +nextBatch = nextBatch(); +} + +return nextBatch.isPresent(); +} + +@Override +public Batch next() { +if (!hasNext()) { +throw new NoSuchElementException("Batch iterator doesn't have any more elements"); +} + +Batch batch = nextBatch.get(); +nextBatch = Optional.empty(); + +return batch; +} + +@Override +public void close() { +isClosed = true; +allocatedBuffer.ifPresent(bufferSupplier::release); +allocatedBuffer = Optional.empty(); +} + +private void checkIfClosed() { +if (isClosed) { +throw new IllegalStateException("Serde record batch itererator was closed"); +} +} + +private Optional> nextBatches() { Review comment: Yes. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r608180796 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.BatchReader.Batch; +import org.apache.kafka.raft.RecordSerde; + +public final class SerdeRecordsIterator implements Iterator>, AutoCloseable { +private final Records records; +private final RecordSerde serde; +private final BufferSupplier bufferSupplier; +private final int maxBatchSize; + +private Optional> nextBatches = Optional.empty(); +private Optional> nextBatch = Optional.empty(); +// Buffer used to as the backing store for nextBatches if needed +private Optional allocatedBuffer = Optional.empty(); +// Number of bytes from records that read +private int bytesRead = 0; +private boolean isClosed = false; + +public SerdeRecordsIterator( +Records records, +RecordSerde serde, +BufferSupplier bufferSupplier, +int maxBatchSize +) { +this.records = records; +this.serde = serde; +this.bufferSupplier = bufferSupplier; +this.maxBatchSize = maxBatchSize; +} + +@Override +public boolean hasNext() { +checkIfClosed(); + +if (!nextBatch.isPresent()) { +nextBatch = nextBatch(); +} + +return nextBatch.isPresent(); +} + +@Override +public Batch next() { +if (!hasNext()) { +throw new NoSuchElementException("Batch iterator doesn't have any more elements"); +} + +Batch batch = nextBatch.get(); +nextBatch = Optional.empty(); + +return batch; +} + +@Override +public void close() { +isClosed = true; +allocatedBuffer.ifPresent(bufferSupplier::release); +allocatedBuffer = Optional.empty(); +} + +private void checkIfClosed() { +if (isClosed) { +throw new IllegalStateException("Serde record batch itererator was closed"); +} +} + +private Optional> nextBatches() { +int recordSize = records.sizeInBytes(); +if (bytesRead < recordSize) { +final MemoryRecords memoryRecords; +if (records instanceof MemoryRecords) { +bytesRead = recordSize; +memoryRecords = (MemoryRecords) records; +} else if (records instanceof FileRecords) { +final ByteBuffer buffer; +if (allocatedBuffer.isPresent()) { +buffer = allocatedBuffer.get(); +buffer.compact(); + +if (!buffer.hasRemaining()) { +// The buffer is not big enough to read an entire batch +throw new IllegalStateException( +String.format( +"Unable to read batch from file records buffer %s with maximum batch %s and record size %s", +buffer, +maxBatchSize, +records.sizeInBytes() +) +); +} Review comment: To get this new size we need to read the size from the batch record using the `Readable` for example. I don't know if we want to trust this value for memory allocation
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r608180796 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.BatchReader.Batch; +import org.apache.kafka.raft.RecordSerde; + +public final class SerdeRecordsIterator implements Iterator>, AutoCloseable { +private final Records records; +private final RecordSerde serde; +private final BufferSupplier bufferSupplier; +private final int maxBatchSize; + +private Optional> nextBatches = Optional.empty(); +private Optional> nextBatch = Optional.empty(); +// Buffer used to as the backing store for nextBatches if needed +private Optional allocatedBuffer = Optional.empty(); +// Number of bytes from records that read +private int bytesRead = 0; +private boolean isClosed = false; + +public SerdeRecordsIterator( +Records records, +RecordSerde serde, +BufferSupplier bufferSupplier, +int maxBatchSize +) { +this.records = records; +this.serde = serde; +this.bufferSupplier = bufferSupplier; +this.maxBatchSize = maxBatchSize; +} + +@Override +public boolean hasNext() { +checkIfClosed(); + +if (!nextBatch.isPresent()) { +nextBatch = nextBatch(); +} + +return nextBatch.isPresent(); +} + +@Override +public Batch next() { +if (!hasNext()) { +throw new NoSuchElementException("Batch iterator doesn't have any more elements"); +} + +Batch batch = nextBatch.get(); +nextBatch = Optional.empty(); + +return batch; +} + +@Override +public void close() { +isClosed = true; +allocatedBuffer.ifPresent(bufferSupplier::release); +allocatedBuffer = Optional.empty(); +} + +private void checkIfClosed() { +if (isClosed) { +throw new IllegalStateException("Serde record batch itererator was closed"); +} +} + +private Optional> nextBatches() { +int recordSize = records.sizeInBytes(); +if (bytesRead < recordSize) { +final MemoryRecords memoryRecords; +if (records instanceof MemoryRecords) { +bytesRead = recordSize; +memoryRecords = (MemoryRecords) records; +} else if (records instanceof FileRecords) { +final ByteBuffer buffer; +if (allocatedBuffer.isPresent()) { +buffer = allocatedBuffer.get(); +buffer.compact(); + +if (!buffer.hasRemaining()) { +// The buffer is not big enough to read an entire batch +throw new IllegalStateException( +String.format( +"Unable to read batch from file records buffer %s with maximum batch %s and record size %s", +buffer, +maxBatchSize, +records.sizeInBytes() +) +); +} Review comment: To get this new size we need to ready the size from the batch record using the `Readable` for example. I don't know if we want to trust this value for memory allocatio
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r608006894 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.BatchReader.Batch; +import org.apache.kafka.raft.RecordSerde; + +public final class SerdeRecordsIterator implements Iterator>, AutoCloseable { +private final Records records; +private final RecordSerde serde; +private final BufferSupplier bufferSupplier; +private final int maxBatchSize; + +private Optional> nextBatches = Optional.empty(); +private Optional> nextBatch = Optional.empty(); +// Buffer used to as the backing store for nextBatches if needed +private Optional allocatedBuffer = Optional.empty(); +// Number of bytes from records that read +private int bytesRead = 0; +private boolean isClosed = false; + +public SerdeRecordsIterator( +Records records, +RecordSerde serde, +BufferSupplier bufferSupplier, +int maxBatchSize +) { +this.records = records; +this.serde = serde; +this.bufferSupplier = bufferSupplier; +this.maxBatchSize = maxBatchSize; +} + +@Override +public boolean hasNext() { +checkIfClosed(); + +if (!nextBatch.isPresent()) { +nextBatch = nextBatch(); +} + +return nextBatch.isPresent(); +} + +@Override +public Batch next() { +if (!hasNext()) { +throw new NoSuchElementException("Batch iterator doesn't have any more elements"); +} + +Batch batch = nextBatch.get(); +nextBatch = Optional.empty(); + +return batch; +} + +@Override +public void close() { +isClosed = true; +allocatedBuffer.ifPresent(bufferSupplier::release); +allocatedBuffer = Optional.empty(); +} + +private void checkIfClosed() { +if (isClosed) { +throw new IllegalStateException("Serde record batch itererator was closed"); +} +} + +private Optional> nextBatches() { +int recordSize = records.sizeInBytes(); +if (bytesRead < recordSize) { +final MemoryRecords memoryRecords; +if (records instanceof MemoryRecords) { +bytesRead = recordSize; +memoryRecords = (MemoryRecords) records; +} else if (records instanceof FileRecords) { +final ByteBuffer buffer; +if (allocatedBuffer.isPresent()) { +buffer = allocatedBuffer.get(); +buffer.compact(); + +if (!buffer.hasRemaining()) { +// The buffer is not big enough to read an entire batch +throw new IllegalStateException( +String.format( +"Unable to read batch from file records buffer %s with maximum batch %s and record size %s", +buffer, +maxBatchSize, +records.sizeInBytes() +) +); +} Review comment: @hachikuji @mumrah I am not super happy with this handling. Specially since it is possible for us to write a batch greater than `maxBatchSize` based on how `BatchAccum
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r607999624 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.BatchReader.Batch; +import org.apache.kafka.raft.RecordSerde; + +public final class SerdeRecordsIterator implements Iterator>, AutoCloseable { +private final Records records; +private final RecordSerde serde; +private final BufferSupplier bufferSupplier; +private final int maxBatchSize; + +private Optional> nextBatches = Optional.empty(); +private Optional> nextBatch = Optional.empty(); +// Buffer used to as the backing store for nextBatches if needed +private Optional allocatedBuffer = Optional.empty(); +// Number of bytes from records that read +private int bytesRead = 0; +private boolean isClosed = false; + +public SerdeRecordsIterator( +Records records, +RecordSerde serde, +BufferSupplier bufferSupplier, +int maxBatchSize +) { +this.records = records; +this.serde = serde; +this.bufferSupplier = bufferSupplier; +this.maxBatchSize = maxBatchSize; +} + +@Override +public boolean hasNext() { +checkIfClosed(); + +if (!nextBatch.isPresent()) { +nextBatch = nextBatch(); +} + +return nextBatch.isPresent(); +} + +@Override +public Batch next() { +if (!hasNext()) { +throw new NoSuchElementException("Batch iterator doesn't have any more elements"); +} + +Batch batch = nextBatch.get(); +nextBatch = Optional.empty(); + +return batch; +} + +@Override +public void close() { +isClosed = true; +allocatedBuffer.ifPresent(bufferSupplier::release); +allocatedBuffer = Optional.empty(); +} + +private void checkIfClosed() { +if (isClosed) { +throw new IllegalStateException("Serde record batch itererator was closed"); +} +} + +private Optional> nextBatches() { +int recordSize = records.sizeInBytes(); +if (bytesRead < recordSize) { +final MemoryRecords memoryRecords; +if (records instanceof MemoryRecords) { +bytesRead = recordSize; +memoryRecords = (MemoryRecords) records; +} else if (records instanceof FileRecords) { +final ByteBuffer buffer; +if (allocatedBuffer.isPresent()) { +buffer = allocatedBuffer.get(); +buffer.compact(); + +if (!buffer.hasRemaining()) { +// The buffer is not big enough to read an entire batch +throw new IllegalStateException( +String.format( +"Unable to read batch from file records buffer %s with maximum batch %s and record size %s", +buffer, +maxBatchSize, +records.sizeInBytes() +) +); +} +} else { +buffer = bufferSupplier.get(Math.min(maxBatchSize, records.sizeInBytes())); +allocatedBuffer = Optional.of(buffer); +
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r605167349 ## File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java ## @@ -60,19 +61,21 @@ @Override void close(); -class Batch { +final class Batch implements Iterable { Review comment: @hachikuji given that we use this type in `SnapshotReader` I am okay moving this type to something like `o.a.k.r.Batch` instead of `o.a.k.r.BatchReader.Batch`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r600926563 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -87,6 +96,25 @@ public synchronized void handleCommit(BatchReader reader) { } } +@Override +public synchronized void handleSnapshot(SnapshotReader reader) { +try { +try (SnapshotReader snapshot = reader) { +log.debug("Loading snapshot {}", snapshot.snapshotId()); +for (List batch : snapshot) { +for (Integer value : batch) { +log.debug("Setting value: {}", value); +this.committed = value; +this.uncommitted = value; +} +} +log.debug("Finished loading snapshot. Set value: {}", this.committed); +} +} catch (IOException e) { Review comment: I also changed a few places that were throwing `IOException` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r600926104 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.raft.RecordSerde; + +public final class SnapshotReader implements Closeable, Iterable> { Review comment: Implemented a `SerdeRecordsIterator` that both `BatchReader` and `SnapshotReader` use internally. The biggest change here was to take into account large `FileRecords`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r600925523 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -87,6 +96,25 @@ public synchronized void handleCommit(BatchReader reader) { } } +@Override +public synchronized void handleSnapshot(SnapshotReader reader) { +try { +try (SnapshotReader snapshot = reader) { +log.debug("Loading snapshot {}", snapshot.snapshotId()); +for (List batch : snapshot) { Review comment: Yes. I missed this. `ReplicatedCounter` now generates a snapshot after more than 10 records. Fix the existing simulation tests to take snapshot into account. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r600789602 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -87,6 +96,25 @@ public synchronized void handleCommit(BatchReader reader) { } } +@Override +public synchronized void handleSnapshot(SnapshotReader reader) { +try { +try (SnapshotReader snapshot = reader) { +log.debug("Loading snapshot {}", snapshot.snapshotId()); +for (List batch : snapshot) { +for (Integer value : batch) { +log.debug("Setting value: {}", value); +this.committed = value; +this.uncommitted = value; +} +} +log.debug("Finished loading snapshot. Set value: {}", this.committed); +} +} catch (IOException e) { Review comment: This is because `SnapshotReader::close` was declare as throwing an `IOException`. This made the API for `SnapshotReader` confusing as `hasNext` and `next` don't throw an `IOException` even though they read from disk in some cases. I fixed this by changing `SnapshotReader` to implement `AutoCloseable` instead of `Closeable`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r600813565 ## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ## @@ -418,48 +414,49 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) { public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {} @Override -public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) { -if (logStartOffset() > logStartSnapshotId.offset || -highWatermark.offset < logStartSnapshotId.offset) { +public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) { Review comment: Note that the changes to this method are to relax the log start offset and high-watermark invariant so that we can create more interesting snapshot and log states in the `RaftClientTestContext.Builder`. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1300,18 +1330,18 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( int maxSnapshotSize; try { -maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes()); +maxSnapshotSize = Math.toIntExact(snapshotSize); } catch (ArithmeticException e) { maxSnapshotSize = Integer.MAX_VALUE; } if (partitionSnapshot.position() > Integer.MAX_VALUE) { +// TODO: This should return an error response instead of throwing an exception throw new IllegalStateException(String.format("Trying to fetch a snapshot with position: %d lager than Int.MaxValue", partitionSnapshot.position())); } -UnalignedRecords records = snapshot.read(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize)); - -long snapshotSize = snapshot.sizeInBytes(); +// TODO: I think this slice of records is closed when the snapshot is close in the try (...) above. +UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize)); Review comment: When the implementation is a `FileRawSnapshotReader`, the created slice will be `close` before the network client has had a chance to send the bytes. Created https://issues.apache.org/jira/browse/KAFKA-12543 and I will work on this after this PR. ## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ## @@ -355,33 +354,30 @@ public LogFetchInfo read(long startOffset, Isolation isolation) { } ByteBuffer buffer = ByteBuffer.allocate(512); -LogEntry firstEntry = null; +LogOffsetMetadata batchStartOffset = null; Review comment: Note that the changes to this method are so that `read` doesn't return all of the batches from from `startOffset` to `highWatermark`. This was needed for more interested test cases around snapshot loading. ## File path: raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java ## @@ -124,6 +126,15 @@ public void handleCommit(BatchReader reader) { } } +@Override +public void handleSnapshot(SnapshotReader reader) { +// TODO: Create Jira: Handle loading commit in ListenerShim Review comment: Okay. I'll remove the TODO. Do you have a Jira for this? If not let me know and I can create one. ## File path: shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java ## @@ -105,6 +106,22 @@ public void handleCommits(long lastOffset, List messages) { }, null); } +@Override +public void handleSnapshot(SnapshotReader reader) { +// TODO: Create Jira: Need to cover the case where handle snapshot invalidates previous commits +//Need to handle that reader.snapshotId() means that every record up to that offset is committed Review comment: Created this Jira: https://issues.apache.org/jira/browse/KAFKA-12545 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -87,6 +96,25 @@ public synchronized void handleCommit(BatchReader reader) { } } +@Override +public synchronized void handleSnapshot(SnapshotReader reader) { +try { +try (SnapshotReader snapshot = reader) { +log.debug("Loading snapshot {}", snapshot.snapshotId()); +for (List batch : snapshot) { +for (Integer value : batch) { +log.debug("Setting value: {}", value); +this.committed = value; +this.uncommitted = value; +} +} +log.debug("Finished loading snapshot. Set value: {}", this.committed); +} +} catch (IOException e
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r596996121 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.raft.RecordSerde; + +public final class SnapshotReader implements Closeable, Iterable> { Review comment: Fair enough. Let me revisit this and see what code we can reuse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r596301164 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -311,8 +311,18 @@ private void updateListenersProgress(long highWatermark) { private void updateListenersProgress(List listenerContexts, long highWatermark) { for (ListenerContext listenerContext : listenerContexts) { listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> { -if (nextExpectedOffset < log.startOffset()) { -listenerContext.fireHandleSnapshot(log.startOffset()); +if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) { +SnapshotReader snapshot = earliestSnapshot().orElseThrow(() -> { Review comment: No good reason. I'll change it to use the latest. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r578522996 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -321,6 +335,23 @@ private void updateListenersProgress(List listenerContexts, lon } } +private Optional> oldestSnapshot() { +if (log.oldestSnapshotId().isPresent()) { Review comment: Thanks. Fixed! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org