Jun,


Yeah, I probably have an off-by-one issue in the HW description. I
think you could pick any number here and the problem remains -- could
you read through the steps I posted and see if they logically make
sense, numbers aside?

We definitely lost data in 4 partitions of the 8,000 that were elected,
and there was only a single election for each partition. We had done a
rolling restart hours before, but that had been done for a long time and
everything was stable. We do not allow automatic election, it's a manual
process that we initiate after the cluster has stabilized.


So in this case, I still don't think any discussion about multiple-
failovers is germane to the problem we saw. Each of our partitions only
had a single failover, and yet 4 of them still truncated committed data.


--

Mark Smith

m...@qq.is





On Mon, Nov 21, 2016, at 05:12 PM, Jun Rao wrote:

> Hi, Mark,

> 

> Hmm, the committing of a message at offset X is the equivalent of
> saying that the HW is at offset X + 1. So, in your example, if the
> producer publishes a new message at offset 37, this message won't be
> committed (i.e., HW moves to offset 38) until the leader sees the
> follower fetch from offset 38 (not offset 37). At that point, the
> follower would have received message at offset 37 in the fetch
> response and appended that message to its local log. If the follower
> now becomes the new leader, message at offset 37 is preserved.
> 

> The problem that I described regarding data loss can happen during a
> rolling restart. Suppose that you have 3 replicas A, B, and C. Let's
> say A is the preferred the leader, but during the deployment, the
> leader gets moved to replica B at some point and all 3 replicas are in
> sync. A new message is produced at offset 37 and is committed
> (leader's HW =38). However, the HW in replica A is still at 37. Now,
> we try to shutdown broker B and the leader gets moved to replica C.
> Replica A starts to follow replica C and it first truncates to HW 37,
> which removes the message at offset 37. Now, preferred leader logic
> kicks in and the leadership switches again to replica A. Since A
> doesn't have message at offset 37 any more and all followers copy
> messages from replica A, message at offset 37 is lost.
> 

> With KAFKA-3670, in the above example, when shutting down broker B,
> the leader will be directly moved to replica A since it's a preferred
> replica. So the above scenario won't happen.
> 

> The more complete fix is in KAFKA-1211. The logic for getting the
> latest generation snapshot is just a proposal and is not in the code
> base yet.
> 

> Thanks,

> 

> Jun

> 

> On Mon, Nov 21, 2016 at 3:20 PM, Mark Smith <m...@qq.is> wrote:

>> Jun,

>> 

>>  Thanks for the reply!

>> 

>>  I am aware the HW won't advance until the in-sync replicas have

>>  _requested_ the messages. However, I believe the issue is that the

>>  leader has no guarantee the replicas have _received_ the fetch
>>  response.
>>  There is no second-phase to the commit.

>> 

>>  So, in the particular case where a leader transition happens, I
>>  believe
>>  this race condition exists (and I'm happy to be wrong here, but
>>  it looks
>>  feasible and explains the data loss I saw):

>> 

>>  1. Starting point: Leader and Replica both only have up to message
>>     #36
>>  2. Client produces new message with required.acks=all

>>  3. Leader commits message #37, but HW is still #36, the produce
>>     request
>>  is blocked

>>  4. Replica fetches messages (leader has RECEIVED the fetch request)
>>  5. Leader then advances HW to #37 and unblocks the produce request,
>>  client believes it's durable

>>  6. PREFERRED REPLICA ELECTION BEGIN

>>  7. Replica starts become-leader process

>>  8. Leader finishes sending fetch response, replica is just now
>>     seeing
>>  message #37

>>  9. Replica throws away fetch response from step 4 because it is now
>>  becoming leader (partition has been removed from partitionMap so it
>>  looks like data is ignored)

>>  10. Leader starts become-follower

>>  11. Leader truncates to replica HW offset of #36

>>  12. Message #37 was durably committed but is now lost

>> 

>>  For the tickets you linked:

>> 

>> https://issues.apache.org/jira/browse/KAFKA-3670

>>  * There was no shutdown involved in this case, so this shouldn't be
>>  impacting.

>> 

>> https://issues.apache.org/jira/browse/KAFKA-1211

>>  * I've read through this but I'm not entirely sure if it
>>    addresses the
>>  above. I don't think it does, though. I don't see a step in the
>>  ticket
>>  about become-leader making a call to the old leader to get the
>>  latest
>>  generation snapshot?

>> 
>>  --
>>  Mark Smith
>> m...@qq.is

>> 

>> On Fri, Nov 18, 2016, at 10:52 AM, Jun Rao wrote:

>>  > Mark,

>>  >

>>  > Thanks for reporting this. First, a clarification. The HW is
>>  > actually
>>  > never

>>  > advanced until all in-sync followers have fetched the
>>  > corresponding
>>  > message. For example, in step 2, if all follower replicas issue a
>>  > fetch
>>  > request at offset 10, it serves as an indication that all replicas
>>  > have
>>  > received messages up to offset 9. So,only then, the HW is
>>  > advanced to
>>  > offset 10 (which is not inclusive).

>>  >

>>  > I think the problem that you are seeing are probably caused by two
>>  > known
>>  > issues. The first one is

>>  > https://issues.apache.org/jira/browse/KAFKA-1211.

>>  > The issue is that the HW is propagated asynchronously from the
>>  > leader to
>>  > the followers. If the leadership changes multiple time very
>>  > quickly, what
>>  > can happen is that a follower first truncates its data up to HW
>>  > and then
>>  > immediately becomes the new leader. Since the follower's HW may
>>  > not be up
>>  > to date, some previously committed messages could be lost. The
>>  > second one
>>  > is https://issues.apache.org/jira/browse/KAFKA-3670. The issue is
>>  > that
>>  > controlled shutdown and leader balancing can cause leadership to
>>  > change
>>  > more than once quickly, which could expose the data loss problem
>>  > in the
>>  > first issue.

>>  >

>>  > The second issue has been fixed in 0.10.0. So, if you upgrade to
>>  > that
>>  > version or above, it should reduce the chance of hitting the first
>>  > issue
>>  > significantly. We are actively working on the first issue and
>>  > hopefully
>>  > it

>>  > will be addressed in the next release.

>>  >

>>  > Jun

>>  >

>>  > On Thu, Nov 17, 2016 at 5:39 PM, Mark Smith <m...@qq.is> wrote:

>>  >

>>  > > Hey folks,

>>  > >

>>  > > I work at Dropbox and I was doing some maintenance yesterday and
>>  > > it
>>  > > looks like we lost some committed data during a preferred
>>  > > replica
>>  > > election. As far as I understand this shouldn't happen, but I
>>  > > have a
>>  > > theory and want to run it by ya'll.

>>  > >

>>  > > Preamble:

>>  > > * Kafka 0.9.0.1

>>  > > * required.acks = -1 (All)

>>  > > * min.insync.replicas = 2 (only 2 replicas for the partition, so
>>  > >   we
>>  > > require both to have the data)

>>  > > * consumer is Kafka Connect

>>  > > * 1400 topics, total of about 15,000 partitions

>>  > > * 30 brokers

>>  > >

>>  > > I was performing some rolling restarts of brokers yesterday as
>>  > > part of
>>  > > our regular DRT (disaster testing) process and at the end that
>>  > > always
>>  > > leaves many partitions that need to be failed back to the
>>  > > preferred
>>  > > replica. There were about 8,000 partitions that needed moving. I
>>  > > started
>>  > > the election in Kafka Manager and it worked, but it looks like 4
>>  > > of
>>  > > those 8,000 partitions experienced some relatively small amount
>>  > > of data
>>  > > loss at the tail.

>>  > >

>>  > > From the Kafka Connect point of view, we saw a handful of these:
>>  > >

>>  > > [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analytics-staging-8-
>>  > > 5]
>>  > > INFO Fetch offset 67614479952 is out of range, resetting offset
>>  > > (o.a.k.c.c.i.Fetcher:595)

>>  > >

>>  > > I believe that was because it asked the new leader for data and
>>  > > the new
>>  > > leader had less data than the old leader. Indeed, the old leader
>>  > > became
>>  > > a follower and immediately truncated:

>>  > >

>>  > > 2016-11-17 02:55:27,237 INFO log.Log: Truncating log

>>  > > goscribe.client-host_activity-21 to offset 67614479601.

>>  > >

>>  > > Given the above production settings I don't know why KC would
>>  > > ever see
>>  > > an OffsetOutOfRange error but this caused KC to reset to the
>>  > > beginning
>>  > > of the partition. Various broker logs for the failover paint the
>>  > > following timeline:

>>  > > https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6
>>  > >

>>  > > My current working theory that I'd love eyes on:

>>  > >

>>  > >   1. Leader receives produce request and appends to log,
>>  > >      incrementing
>>  > >   LEO, but given the durability requirements the HW is not
>>  > >   incremented
>>  > >   and the produce response is delayed (normal)

>>  > >

>>  > >   2. Replica sends Fetch request to leader as part of normal
>>  > >      replication
>>  > >   flow

>>  > >

>>  > >   3. Leader increments HW when it STARTS to respond to the Fetch
>>  > >      request
>>  > >   (per fetchMessages in ReplicaManager.scala), so the HW is
>>  > >   updated as
>>  > >   soon as we've prepared messages for response -- importantly
>>  > >   the HW is
>>  > >   updated even though the replica has not yet actually seen the
>>  > >   messages, even given the durability settings we've got

>>  > >

>>  > >   4. Meanwhile, Kafka Connect sends Fetch request to leader and
>>  > >      receives
>>  > >   the messages below the new HW, but the messages have not
>>  > >   actually been
>>  > >   received by the replica yet still

>>  > >

>>  > >   5. Preferred replica election begins (oh the travesty!)

>>  > >

>>  > >   6. Replica starts the become-leader process and makeLeader
>>  > >      removes
>>  > >   this partition from partitionMap, which means when the
>>  > >   response comes
>>  > >   in finally, we ignore it (we discard the old-leader committed
>>  > >   messages)

>>  > >

>>  > >   7. Old-leader starts become-follower process and truncates to
>>  > >      the HW
>>  > >   of the new-leader i.e. the old-leader has now thrown away data
>>  > >   it had
>>  > >   committed and given out moments ago

>>  > >

>>  > >   8. Kafka Connect sends Fetch request to the new-leader but its
>>  > >      offset
>>  > >   is now greater than the HW of the new-leader, so we get the

>>  > >   OffsetOutOfRange error and restart

>>  > >

>>  > > Can someone tell me whether or not this is plausible? If it is,
>>  > > is there
>>  > > a known issue/bug filed for it? I'm not exactly sure what the
>>  > > solution
>>  > > is, but it does seem unfortunate that a normal operation (leader
>>  > > election with both brokers alive and well) can result in the
>>  > > loss of
>>  > > committed messages.

>>  > >

>>  > > And, if my theory doesn't hold, can anybody explain what
>>  > > happened? I'm
>>  > > happy to provide more logs or whatever.

>>  > >

>>  > > Thanks!

>>  > >

>>  > >

>>  > > --

>>  > > Mark Smith

>>  > > m...@qq.is

>>  > >


Reply via email to