Jose Armando Garcia Sancio created KAFKA-13148: --------------------------------------------------
Summary: Kraft Controller doesn't handle scheduleAppend returning Long.MAX_VALUE Key: KAFKA-13148 URL: https://issues.apache.org/jira/browse/KAFKA-13148 Project: Kafka Issue Type: Bug Components: controller, kraft Reporter: Jose Armando Garcia Sancio In some cases the RaftClient will return Long.MAX_VALUE: {code:java} /** * Append a list of records to the log. The write will be scheduled for some time * in the future. There is no guarantee that appended records will be written to * the log and eventually committed. However, it is guaranteed that if any of the * records become committed, then all of them will be. * * If the provided current leader epoch does not match the current epoch, which * is possible when the state machine has yet to observe the epoch change, then * this method will return {@link Long#MAX_VALUE} to indicate an offset which is * not possible to become committed. The state machine is expected to discard all * uncommitted entries after observing an epoch change. * * @param epoch the current leader epoch * @param records the list of records to append * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the records could * be committed; null if no memory could be allocated for the batch at this time * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of the records is greater than the maximum * batch size; if this exception is throw none of the elements in records were * committed */ Long scheduleAtomicAppend(int epoch, List<T> records); {code} The controller doesn't handle this case: {code:java} // If the operation returned a batch of records, those records need to be // written before we can return our result to the user. Here, we hand off // the batch of records to the raft client. They will be written out // asynchronously. final long offset; if (result.isAtomic()) { offset = raftClient.scheduleAtomicAppend(controllerEpoch, result.records()); } else { offset = raftClient.scheduleAppend(controllerEpoch, result.records()); } op.processBatchEndOffset(offset); writeOffset = offset; resultAndOffset = ControllerResultAndOffset.of(offset, result); for (ApiMessageAndVersion message : result.records()) { replay(message.message(), Optional.empty(), offset); } snapshotRegistry.getOrCreateSnapshot(offset); log.debug("Read-write operation {} will be completed when the log " + "reaches offset {}.", this, resultAndOffset.offset()); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)