Also, looking back at my logs, I'm wondering if a producer will reuse the
same socket to send data to the same broker, for multiple topics (I'm
guessing yes).  In which case, it looks like I'm seeing this scenario:

1. producer1 is happily sending messages for topicX and topicY to serverA
(serverA is the leader for both topics, only 1 partition for each topic for
simplicity).
2. serverA is restarted, and in the process, serverB becomes the new leader
for both topicX and topicY.
3. producer1 decides to send a new message to topicX to serverA.
3a. this results in an exception ("Connection reset by peer").  producer1's
connection to serverA is invalidated.
3b. producer1 makes a new metadata request for topicX, and learns that
serverB is now the leader for topicX.
3c. producer1 resends the message to topicX, on serverB.
4. producer1 decides to send a new message to topicY to serverA.
4a. producer1 notes that it's socket to serverA is invalid, so it creates a
new connection to serverA.
4b. producer1 successfully sends it's message to serverA (without realizing
that serverA is no longer the leader for topicY).
4c. serverA logs to it's console:
2013-06-23 08:28:46,770  WARN [kafka-request-handler-2] server.KafkaApis -
[KafkaApi-508818741] Produce request with correlation id 7136261 from
client  on partition [mytopic,0] failed due to Leader not local for
partition [mytopic,0] on broker 508818741
5. producer1 continues to send messages for topicY to serverA, and serverA
continues to log the same messages.
6. 10 minutes later, producer1 decides to update it's metadata for topicY,
and learns that serverB is now the leader for topidY.
7. the warning messages finally stop in the console for serverA.

I am pretty sure this scenario, or one very close to it, is what I'm seeing
in my logs, after doing a rolling restart, with controlled shutdown.

Does this scenario make sense?

One thing I notice, is that in the steady state, every 10 minutes the
producer refreshes it's metadata for all topics.  However, when sending a
message to a specific topic fails, only the metadata for that topic is
refreshed, even though the ramifications should be that all topics which
have the same leader might need to be refreshed, especially in response to
a "connection reset by peer".

Jason



On Mon, Jun 24, 2013 at 10:14 PM, Jason Rosenberg <j...@squareup.com> wrote:

> Jun,
>
> To be clear, this whole discussion was started, because I am clearly
> seeing "failed due to Leader not local" on the last broker restarted,
> after all the controlled shutting down has completed and all brokers
> restarted.
>
> This leads me to believe that a client made a meta data request and found
> out that server A was the leader for it's partition, and then server A was
> restarted, and then the client makes repeated producer requests to server
> A, without encountering a broken socket.  Thus, I'm not sure it's correct
> that the socket is invalidated in that case after a restart.
>
> Alternatively, could it be that the client (which sends messages to
> multiple topics), gets metadata updates for multiple topics, but doesn't
> attempt to send a message to topicX until after the leader has changed and
> server A has been restarted.  In this case, if it's the first time the
> producer sends to topicX, does it only then create a new socket?
>
> Jason
>
>
> On Mon, Jun 24, 2013 at 10:00 PM, Jun Rao <jun...@gmail.com> wrote:
>
>> That should be fine since the old socket in the producer will no longer be
>> usable after a broker is restarted.
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Mon, Jun 24, 2013 at 9:50 PM, Jason Rosenberg <j...@squareup.com>
>> wrote:
>>
>> > What about a non-controlled shutdown, and a restart, but the producer
>> never
>> > attempts to send anything during the time the broker was down?  That
>> could
>> > have caused a leader change, but without the producer knowing to refresh
>> > it's metadata, no?
>> >
>> >
>> > On Mon, Jun 24, 2013 at 9:05 PM, Jun Rao <jun...@gmail.com> wrote:
>> >
>> > > Other than controlled shutdown, the only other case that can cause the
>> > > leader to change when the underlying broker is alive is when the
>> broker
>> > > expires its ZK session (likely due to GC), which should be rare. That
>> > being
>> > > said, forwarding in the broker may not be a bad idea. Could you file a
>> > jira
>> > > to track this?
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Mon, Jun 24, 2013 at 2:50 PM, Jason Rosenberg <j...@squareup.com>
>> > wrote:
>> > >
>> > > > Yeah,
>> > > >
>> > > > I see that with ack=0, the producer will be in a bad state anytime
>> the
>> > > > leader for it's partition has changed, while the broker that it
>> thinks
>> > is
>> > > > the leader is still up.  So this is a problem in general, not only
>> for
>> > > > controlled shutdown, but even for the case where you've restarted a
>> > > server
>> > > > (without controlled shutdown), which in and of itself can force a
>> > leader
>> > > > change.  If the producer doesn't attempt to send a message during
>> the
>> > > time
>> > > > the broker was down, it will never get a connection failure, and
>> never
>> > > get
>> > > > fresh metadata, and subsequently start sending messages to the
>> > > non-leader.
>> > > >
>> > > > Thus, I'd say this is a problem with ack=0, regardless of controlled
>> > > > shutdown.  Any time there's a leader change, the producer will send
>> > > > messages into the ether.  I think this is actually a severe
>> condition,
>> > > that
>> > > > could be considered a bug.  How hard would it be to have the
>> receiving
>> > > > broker forward on to the leader, in this case?
>> > > >
>> > > > Jason
>> > > >
>> > > >
>> > > > On Mon, Jun 24, 2013 at 8:44 AM, Joel Koshy <jjkosh...@gmail.com>
>> > wrote:
>> > > >
>> > > > > I think Jason was suggesting quiescent time as a possibility only
>> if
>> > > the
>> > > > > broker did request forwarding if it is not the leader.
>> > > > >
>> > > > > On Monday, June 24, 2013, Jun Rao wrote:
>> > > > >
>> > > > > > Jason,
>> > > > > >
>> > > > > > The quiescence time that you proposed won't work. The reason is
>> > that
>> > > > with
>> > > > > > ack=0, the producer starts losing data silently from the moment
>> the
>> > > > > leader
>> > > > > > is moved (by controlled shutdown) until the broker is shut down.
>> > So,
>> > > > the
>> > > > > > sooner that you can shut down the broker, the better. What we
>> > > realized
>> > > > is
>> > > > > > that if you can use a larger batch size, ack=1 can still deliver
>> > very
>> > > > > good
>> > > > > > throughput.
>> > > > > >
>> > > > > > Thanks,
>> > > > > >
>> > > > > > Jun
>> > > > > >
>> > > > > >
>> > > > > > On Mon, Jun 24, 2013 at 12:22 AM, Jason Rosenberg <
>> > j...@squareup.com
>> > > > > <javascript:;>>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Yeah I am using ack = 0, so that makes sense.  I'll need to
>> > rethink
>> > > > > that,
>> > > > > > > it would seem.  It would be nice, wouldn't it, in this case,
>> for
>> > > the
>> > > > > > broker
>> > > > > > > to realize this and just forward the messages to the correct
>> > > leader.
>> > > > > >  Would
>> > > > > > > that be possible?
>> > > > > > >
>> > > > > > > Also, it would be nice to have a second option to the
>> controlled
>> > > > > shutdown
>> > > > > > > (e.g. controlled.shutdown.quiescence.ms), to allow the
>> broker to
>> > > > wait
>> > > > > > > after
>> > > > > > > the controlled shutdown, a prescribed amount of time before
>> > > actually
>> > > > > > > shutting down the server. Then, I could set this value to
>> > > something a
>> > > > > > > little greater than the producer's '
>> > > > topic.metadata.refresh.interval.ms
>> > > > > '.
>> > > > > > >  This would help with hitless rolling restarts too.
>>  Currently,
>> > > every
>> > > > > > > producer gets a very loud "Connection Reset" with a tall stack
>> > > trace
>> > > > > each
>> > > > > > > time I restart a broker.  Would be nicer to have the producers
>> > > still
>> > > > be
>> > > > > > > able to produce until the metadata refresh interval expires,
>> then
>> > > get
>> > > > > the
>> > > > > > > word that the leader has moved due to the controlled shutdown,
>> > and
>> > > > then
>> > > > > > > start producing to the new leader, all before the shutting
>> down
>> > > > server
>> > > > > > > actually shuts down.  Does that seem feasible?
>> > > > > > >
>> > > > > > > Jason
>> > > > > > >
>> > > > > > >
>> > > > > > > On Sun, Jun 23, 2013 at 8:23 PM, Jun Rao <jun...@gmail.com
>> > > > > <javascript:;>>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Jason,
>> > > > > > > >
>> > > > > > > > Are you using ack = 0 in the producer? This mode doesn't
>> work
>> > > well
>> > > > > with
>> > > > > > > > controlled shutdown (this is explained in FAQ i*n
>> > > > > > > >
>> > > > >
>> > https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#)*
>> > > > > > > > *
>> > > > > > > > *
>> > > > > > > > Thanks,
>> > > > > > > >
>> > > > > > > > Jun
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Sun, Jun 23, 2013 at 1:45 AM, Jason Rosenberg <
>> > > j...@squareup.com
>> > > > > <javascript:;>
>> > > > > > >
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > I'm working on trying on having seamless rolling restarts
>> for
>> > > my
>> > > > > > kafka
>> > > > > > > > > servers, running 0.8.  I have it so that each server will
>> be
>> > > > > > restarted
>> > > > > > > > > sequentially.  Each server takes itself out of the load
>> > > balancer
>> > > > > > (e.g.
>> > > > > > > > sets
>> > > > > > > > > a status that the lb will recognize, and then waits more
>> than
>> > > > long
>> > > > > > > enough
>> > > > > > > > > for the lb to stop sending meta-data requests to that
>> > server).
>> > > > >  Then
>> > > > > > I
>> > > > > > > > > initiate the shutdown (with
>> controlled.shutdown.enable=true).
>> > > >  This
>> > > > > > > seems
>> > > > > > > > > to work well, however, I occasionally see warnings like
>> this
>> > in
>> > > > the
>> > > > > > log
>> > > > > > > > > from the server, after restart:
>> > > > > > > > >
>> > > > > > > > > 2013-06-23 08:28:46,770  WARN [kafka-request-handler-2]
>> > > > > > > server.KafkaApis
>> > > > > > > > -
>> > > > > > > > > [KafkaApi-508818741] Produce request with correlation id
>> > > 7136261
>> > > > > from
>> > > > > > > > > client  on partition [mytopic,0] failed due to Leader not
>> > local
>> > > > for
>> > > > > > > > > partition [mytopic,0] on broker 508818741
>> > > > > > > > >
>> > > > > > > > > This WARN seems to persistently repeat, until the producer
>> > > client
>> > > > > > > > initiates
>> > > > > > > > > a new meta-data request (e.g. every 10 minutes, by
>> default).
>> > > > > >  However,
>> > > > > > > > the
>> > > > > > > > > producer doesn't log any errors/exceptions when the
>> server is
>> > > > > logging
>> > > > > > > > this
>> > > > > > > > > WARN.
>> > > > > > > > >
>> > > > > > > > > What's happening here?  Is the message silently being
>> > forwarded
>> > > > on
>> > > > > to
>> > > > > > > the
>> > > > > > > > > correct leader for the partition?  Is the message dropped?
>> >  Are
>> > > > > these
>> > > > > > > > WARNS
>> > > > > > > > > particularly useful?
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > >
>> > > > > > > > > Jason
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to