Hi all, 
        in terms of topic, we create a topic with 6 partition,and each with 3 
replicas.
        in terms of producer,when we send message with ack -1 using sync 
interface.
        in terms of brokers,we set min.insync.replicas to 2.

after we review the kafka broker’s code,we know that we send a message to 
broker with ack -1, then we can get response if ISR of this partition is great 
than or equal to min.insync.replicas,but what confused
me is replicas in ISR is not strongly consistent,in kafka 0.9 we use 
replica.lag.time.max.ms param to judge whether to shrink ISR, and the defaults 
is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
we we restart broker which own this partitions’ leader, then controller will 
start a new leader election, which will choose the first replica in ISR that 
not equals to current leader as new leader, then this will loss data.


The main produce handle code shows below:
val numAcks = curInSyncReplicas.count(r => {
          if (!r.isLocal)
            if (r.logEndOffset.messageOffset >= requiredOffset) {
              trace("Replica %d of %s-%d received offset %d".format(r.brokerId, 
topic, partitionId, requiredOffset))
              true
            }
            else
              false
          else
            true /* also count the local (leader) replica */
        })

        trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, 
topic, partitionId))

        val minIsr = leaderReplica.log.get.config.minInSyncReplicas

        if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
          /*
          * The topic may be configured not to accept messages if there are not 
enough replicas in ISR
          * in this scenario the request was already appended locally and then 
added to the purgatory before the ISR was shrunk
          */
          if (minIsr <= curInSyncReplicas.size) {
            (true, ErrorMapping.NoError)
          } else {
            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
          }
        } else
          (false, ErrorMapping.NoError)


why only logging unAcks and not use numAcks to compare with minIsr, if numAcks 
is 0, but curInSyncReplicas.size >= minIsr, then this will return, as ISR 
shrink procedure is not real time, does this will loss data after leader 
election?

Feedback is greatly appreciated. Thanks.
meituan.inf



Reply via email to