[ 
https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199132#comment-14199132
 ] 

schandr commented on KAFKA-1738:
--------------------------------

Ok....Here is my understanding. This might probably be a bug.

1. For any requests that the controller sends to the Broker, it uses the 
BlockingChannel - that's initialized with the controller.socket.timeout.ms 
value specified in the server.properties.
2. Once the channel is established the RequestSendThread uses this channel to 
send any requests such as LeaderAndIsr, UpdateMetaData without checking if the 
channel is still open
3. Based on the value, the socket might have timed out. The following Code in 
the RequestSendThread calls the connectToBroker again on catching the 
exception, but does not send the failed request. If the request happens to be 
LeaderAndIsr for the new partition it results in missing log directory creation 
or other errors which results in producer or consumer throwing exceptions when 
they try to produce or consume data from the failed topic.




 var isSendSuccessful = false
        while(isRunning.get() && !isSendSuccessful) {
          // if a broker goes down for a long time, then at some point the 
controller's zookeeper listener will trigger a
          // removeBroker which will invoke shutdown() on this thread. At that 
point, we will stop retrying.
          try {
            channel.send(request)
            isSendSuccessful = true
          } catch {
            case e: Throwable => // if the send was not successful, reconnect 
to broker and resend the message
              error(("Controller %d epoch %d failed to send request %s to 
broker %s. " +
                "Reconnecting to broker.").format(controllerId, 
controllerContext.epoch,
                request.toString, toBroker.toString()), e)
              channel.disconnect()
              connectToBroker(toBroker, channel)
              isSendSuccessful = false
              // backoff before retrying the connection and send
              Utils.swallow(Thread.sleep(300))
          }
        }


In the code below, after reconnecting to the broker, it should also resend the 
failed request.

> Partitions for topic not created after restart from forced shutdown
> -------------------------------------------------------------------
>
>                 Key: KAFKA-1738
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1738
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8.1.1, 0.8.2
>         Environment: Linux, 2GB RAM, 2 Core CPU
>            Reporter: Pradeep
>         Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, 
> ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt
>
>
> We are using Kafka Topic APIs to create the topic. But in some cases, the 
> topic gets created but we don't see the partition specific files and when 
> producer/consumer tries to get the topic metadata and it fails with 
> exception. Same happens if one tries to create using the command line.
> k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for 
> topic tloader1 -> No partition metadata for topic tloader1 due to 
> kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class 
> kafka.common.UnknownTopicOrPartitionException
> Steps to reproduce - 
> 1.      Stop kafka using kill  -9 <PID of Kafka>
> 2.      Start Kafka
> 3.      Create Topic with partition and replication factor of 1.
> 4.      Check the response “Created topic <topic_name>”
> 5.      Run the list command to verify if its created.
> 6.      Now check the data directory of kakfa. There would not be any for the 
> newly created topic.
> We see issues when we are creating new topics. This happens randomly and we 
> dont know the exact reasons. We see the below logs in controller during the 
> time of creation of topics which doesnt have the partition files.
> 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for 
> [JobJTopic,0] (kafka.controller.KafkaController)
> [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation 
> callback for [JobJTopic,0] (kafka.controller.KafkaController)
> [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: 
> Invoking state change to NewPartition for partitions [JobJTopic,0] 
> (kafka.controller.PartitionStateMachine)
> [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: 
> Invoking state change to NewReplica for replicas 
> [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
> [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: 
> Invoking state change to OnlinePartition for partitions [JobJTopic,0] 
> (kafka.controller.PartitionStateMachine)
> [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: 
> Live assigned replicas for partition [JobJTopic,0] are: [List(0)] 
> (kafka.controller.PartitionStateMachine)
> [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: 
> Initializing leader and isr for partition [JobJTopic,0] to 
> (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) 
> (kafka.controller.PartitionStateMachine)
> [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: 
> Invoking state change to OnlineReplica for replicas 
> [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
> [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], 
> Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 
> (kafka.controller.RequestSendThread)
> java.io.EOFException: Received -1 when reading from channel, socket has 
> likely been closed.
>       at kafka.utils.Utils$.read(Utils.scala:381)
>       at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>       at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>       at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>       at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
>       at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], 
> Controller 0 epoch 2 failed to send request 
> Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0]
>  -> 
> (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0)
>  to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. 
> (kafka.controller.RequestSendThread)
> java.nio.channels.ClosedChannelException
>       at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
>       at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>       at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to