Hi Sriram,

I don't see any indication at all on the producer that there's a problem.
 Only the above logging on the server (and it repeats continually).  I
think what may be happening is that the producer for that topic did not
actually try to send a message between the start of the controlled shutdown
(which changed the leader for the topic) and the time the server was
restarted.  So the client never sees that the leader changed, but also
never got an exception returned, so it just keeps on sending messages to
the former leader.

I do see the sequence you describe, for errors relating to a broken
connection (e.g when the server gets restarted as part of the rolling
restart, and the producer actually tries to send a message while the server
is down).  In that case I do see on the client (I've renamed identifying
topic/host names here):

2013-06-23 08:25:28,420  WARN [ProducerSendThread-]
async.DefaultEventHandler - Failed to send producer request with
correlation id 474527 to broker 508818741 with data for partitions
[mytopic,0]
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcher.writev0(Native Method)
        at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:33)
        at sun.nio.ch.IOUtil.write(IOUtil.java:125)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:367)
        at java.nio.channels.SocketChannel.write(SocketChannel.java:360)
        at
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
        at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
        at
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
        at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
        at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
        at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
        at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
        at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
        at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
        at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
        at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
        at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
        at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
2013-06-23 08:25:28,421  INFO [ProducerSendThread-]
async.DefaultEventHandler - Back off for 100 ms before retrying send.
Remaining retries = 3
2013-06-23 08:25:28,521  INFO [ProducerSendThread-] client.ClientUtils$ -
Fetching metadata from broker id:0,host:mykafkavip:12345 with correlation
id 474528 for 1 topic(s) Set(mytopic)
2013-06-23 08:25:28,522  INFO [ProducerSendThread-] producer.SyncProducer -
Connected to mykafkavip:12345 for producing
2013-06-23 08:25:28,524  INFO [ProducerSendThread-] producer.SyncProducer -
Disconnecting from mykafkavip:12345
2013-06-23 08:25:28,525  INFO [ProducerSendThread-] producer.SyncProducer -
Connected to kafkaserver1:12345 for producing


On Sun, Jun 23, 2013 at 1:55 AM, Sriram Subramanian <
srsubraman...@linkedin.com> wrote:

> Hey Jason,
>
> The producer on failure initiates a metadata request to refresh its state
> and should issue subsequent requests to the new leader. The errors that
> you see should only happen once per topic partition per producer. Let me
> know if this is not what you see. On the producer end you should see the
> following info logging -
>
> "Back off for x ms before retrying send. Remaining retries = y"
>
> If all the retries of the producer failed, you should see error message
> below -
>
> "Failed to send requests for topics"
>
>
>
> On 6/23/13 1:45 AM, "Jason Rosenberg" <j...@squareup.com> wrote:
>
> >I'm working on trying on having seamless rolling restarts for my kafka
> >servers, running 0.8.  I have it so that each server will be restarted
> >sequentially.  Each server takes itself out of the load balancer (e.g.
> >sets
> >a status that the lb will recognize, and then waits more than long enough
> >for the lb to stop sending meta-data requests to that server).  Then I
> >initiate the shutdown (with controlled.shutdown.enable=true).  This seems
> >to work well, however, I occasionally see warnings like this in the log
> >from the server, after restart:
> >
> >2013-06-23 08:28:46,770  WARN [kafka-request-handler-2] server.KafkaApis -
> >[KafkaApi-508818741] Produce request with correlation id 7136261 from
> >client  on partition [mytopic,0] failed due to Leader not local for
> >partition [mytopic,0] on broker 508818741
> >
> >This WARN seems to persistently repeat, until the producer client
> >initiates
> >a new meta-data request (e.g. every 10 minutes, by default).  However, the
> >producer doesn't log any errors/exceptions when the server is logging this
> >WARN.
> >
> >What's happening here?  Is the message silently being forwarded on to the
> >correct leader for the partition?  Is the message dropped?  Are these
> >WARNS
> >particularly useful?
> >
> >Thanks,
> >
> >Jason
>
>

Reply via email to