@wangguozhang,could you give me some advices.

> 在 2016年9月22日,下午6:56,Kafka <kafka...@126.com> 写道:
> 
> Hi all,       
>       in terms of topic, we create a topic with 6 partition,and each with 3 
> replicas.
>        in terms of producer,when we send message with ack -1 using sync 
> interface.
>       in terms of brokers,we set min.insync.replicas to 2.
> 
> after we review the kafka broker’s code,we know that we send a message to 
> broker with ack -1, then we can get response if ISR of this partition is 
> great than or equal to min.insync.replicas,but what confused
> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use 
> replica.lag.time.max.ms param to judge whether to shrink ISR, and the 
> defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
> we we restart broker which own this partitions’ leader, then controller will 
> start a new leader election, which will choose the first replica in ISR that 
> not equals to current leader as new leader, then this will loss data.
> 
> 
> The main produce handle code shows below:
> val numAcks = curInSyncReplicas.count(r => {
>          if (!r.isLocal)
>            if (r.logEndOffset.messageOffset >= requiredOffset) {
>              trace("Replica %d of %s-%d received offset 
> %d".format(r.brokerId, topic, partitionId, requiredOffset))
>              true
>            }
>            else
>              false
>          else
>            true /* also count the local (leader) replica */
>        })
> 
>        trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, 
> topic, partitionId))
> 
>        val minIsr = leaderReplica.log.get.config.minInSyncReplicas
> 
>        if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
>          /*
>          * The topic may be configured not to accept messages if there are 
> not enough replicas in ISR
>          * in this scenario the request was already appended locally and then 
> added to the purgatory before the ISR was shrunk
>          */
>          if (minIsr <= curInSyncReplicas.size) {
>            (true, ErrorMapping.NoError)
>          } else {
>            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>          }
>        } else
>          (false, ErrorMapping.NoError)
> 
> 
> why only logging unAcks and not use numAcks to compare with minIsr, if 
> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return, as 
> ISR shrink procedure is not real time, does this will loss data after leader 
> election?
> 
> Feedback is greatly appreciated. Thanks.
> meituan.inf
> 
> 
> 


Reply via email to