[
https://issues.apache.org/jira/browse/KAFKA-13148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jason Gustafson resolved KAFKA-13148.
-------------------------------------
Resolution: Fixed
Closing this since it was fixed by KAFKA-12158.
> Kraft Controller doesn't handle scheduleAppend returning Long.MAX_VALUE
> -----------------------------------------------------------------------
>
> Key: KAFKA-13148
> URL: https://issues.apache.org/jira/browse/KAFKA-13148
> Project: Kafka
> Issue Type: Bug
> Components: controller, kraft
> Reporter: Jose Armando Garcia Sancio
> Assignee: Niket Goel
> Priority: Major
> Labels: kip-500
>
> In some cases the RaftClient will return Long.MAX_VALUE:
> {code:java}
> /**
> * Append a list of records to the log. The write will be scheduled for
> some time
> * in the future. There is no guarantee that appended records will be
> written to
> * the log and eventually committed. However, it is guaranteed that if
> any of the
> * records become committed, then all of them will be.
> *
> * If the provided current leader epoch does not match the current
> epoch, which
> * is possible when the state machine has yet to observe the epoch
> change, then
> * this method will return {@link Long#MAX_VALUE} to indicate an offset
> which is
> * not possible to become committed. The state machine is expected to
> discard all
> * uncommitted entries after observing an epoch change.
> *
> * @param epoch the current leader epoch
> * @param records the list of records to append
> * @return the expected offset of the last record; {@link
> Long#MAX_VALUE} if the records could
> * be committed; null if no memory could be allocated for the
> batch at this time
> * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException
> if the size of the records is greater than the maximum
> * batch size; if this exception is throw none of the elements
> in records were
> * committed
> */
> Long scheduleAtomicAppend(int epoch, List<T> records);
> {code}
> The controller doesn't handle this case:
> {code:java}
> // If the operation returned a batch of records, those
> records need to be
> // written before we can return our result to the user.
> Here, we hand off
> // the batch of records to the raft client. They will be
> written out
> // asynchronously.
> final long offset;
> if (result.isAtomic()) {
> offset =
> raftClient.scheduleAtomicAppend(controllerEpoch, result.records());
> } else {
> offset = raftClient.scheduleAppend(controllerEpoch,
> result.records());
> }
> op.processBatchEndOffset(offset);
> writeOffset = offset;
> resultAndOffset = ControllerResultAndOffset.of(offset,
> result);
> for (ApiMessageAndVersion message : result.records()) {
> replay(message.message(), Optional.empty(), offset);
> }
> snapshotRegistry.getOrCreateSnapshot(offset);
> log.debug("Read-write operation {} will be completed when
> the log " +
> "reaches offset {}.", this, resultAndOffset.offset());
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)