Hi
Here is the doc in the ReplicaFetcherThread.handleOffsetOutOfRange which
maybe the answer:
/**
* Handle a partition whose offset is out of range and return a new fetch
offset.
*/
def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
val replica = replicaMgr.getReplica(topicAndPartition.topic,
topicAndPartition.partition).get
/**
* Unclean leader election: A follower goes down, in the meanwhile the
leader keeps appending messages. The follower comes back up
* and before it has completely caught up with the leader's logs, all
replicas in the ISR go down. The follower is now uncleanly
* elected as the new leader, and it starts appending messages from the
client. The old leader comes back up, becomes a follower
* and it may discover that the current leader's end offset is behind
its own end offset.
*
* In such a case, truncate the current follower's log to the current
leader's end offset and continue fetching.
*
* There is a potential for a mismatch between the logs of the two
replicas here. We don't fix this mismatch as of now.
*/
val leaderEndOffset =
simpleConsumer.earliestOrLatestOffset(topicAndPartition,
OffsetRequest.LatestTime, brokerConfig.brokerId)
if (leaderEndOffset < replica.logEndOffset.messageOffset) {
// Prior to truncating the follower's log, ensure that doing so is
not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election
configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader
cannot be elected if disallowed by the broker configuration.
if (!LogConfig.fromProps(brokerConfig.toProps,
AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data
loss does not unexpectedly occur.
fatal("Halting because log truncation is not allowed for topic
%s,".format(topicAndPartition.topic) +
" Current leader %d's latest offset %d is less than replica %d's
latest offset %d"
.format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId,
replica.logEndOffset.messageOffset))
Runtime.getRuntime.halt(1)
}
replicaMgr.logManager.truncateTo(Map(topicAndPartition ->
leaderEndOffset))
warn("Replica %d for partition %s reset its fetch offset from %d to
current leader %d's latest offset %d"
.format(brokerConfig.brokerId, topicAndPartition,
replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
leaderEndOffset
} else {
/**
* The follower could have been down for a long time and when it
starts up, its end offset could be smaller than the leader's
* start offset because the leader has deleted old logs
(log.logEndOffset < leaderStartOffset).
*
* Roll out a new log at the follower with the start offset equal to
the current leader's start offset and continue fetching.
*/
val leaderStartOffset =
simpleConsumer.earliestOrLatestOffset(topicAndPartition,
OffsetRequest.EarliestTime, brokerConfig.brokerId)
replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition,
leaderStartOffset)
warn("Replica %d for partition %s reset its fetch offset from %d to
current leader %d's start offset %d"
.format(brokerConfig.brokerId, topicAndPartition,
replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
leaderStartOffset
}
}
2016-02-25 14:38 GMT+08:00 Avanish Mishra <[email protected]>:
> We are running 10 node kafka cluster in test setup with replication factor
> of 3 and topics with min.insync.replica as 2.
> Recently i noticed that few nodes halted on restart after multiple node
> failure with FATAL message:
>
> "Halting because log truncation is not allowed for topic 1613_spam,
> Current leader 2003's latest offset 20 is less than replica 2004's latest
> offset 21 (kafka.server.ReplicaFetcherThread)"
> My understanding is that this can happen if there is slow replica in ISR
> which doesn't have latest committed message and high water mark. As
> min.insync.replicas is 2, write will be committed when it complete on
> leader and 1 follower. Since replica.lag.time.max.ms setting is 10000,
> any slow replica can be in ISR for last 10 sec without fetching any
> message. if leader goes down within that interval and slow follower is
> elected as leader, this will result in new leader with offset less than the
> follower. Is this explanation correct or i am missing something? What is
> the best way to recover committed message in such situation?
>
> We are running cluster with following settings.
> - replication factor 3- min.insync.replicas is set to 2.
> - request.required.acks -1- unclean.leader.election.enable is set to
> false- replica.lag.time.max.ms is 10000-
> replica.high.watermark.checkpoint.interval.ms 1000
>
>
> Thanks
> Avanish