[ 
https://issues.apache.org/jira/browse/KAFKA-6404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16441820#comment-16441820
 ] 

Manikumar commented on KAFKA-6404:
----------------------------------

SocketServer error handling is improved in KAFKA-5607. This must have been 
handled in KAFKA-5607. 

> OldConsumer FetchRequest apiVersion not match resulting in broker 
> RequestHandler socket leak
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6404
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6404
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.10.0.1
>            Reporter: Yu Gan
>            Priority: Critical
>
> *kafka broker version*: 0.10.0.1
> *cluster info*: 200+ nodes, no acls, any client in the same LAN could access
> *situation*: someone uses high released version (such as 0.11.x) of 
> bin/kafka-console-consumer.sh with parameter "--zookeeper" to continuously 
> consume a topic with partitions spread all the brokers
> *phenomenon*: 
> 1.broker server log:
> errors like: 
> 1) Connection to 2 was disconnected before the response was read;
> 2) Shrinking ISR for partition [abc, 21] from 33,13,14 to 33;
> 3) ERROR Processor got uncaught exception. (kafka.network.Processor) 
> java.nio.BufferUnderflowException
> 2.common consumers keeping in rebalance status:
> errors like:
> 1) c.p.b.f.l.c.FiberTopoWorkerThread : got uncaught exception
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member. This means that the time between subsequent calls to 
> poll() was longer than the configured session.timeout.ms, which typically 
> implies that the poll loop is spending too much time message processing. You 
> can address this either by increasing the session timeout or by reducing the 
> maximum size of batches returned in poll() with max.poll.records.
> 2) java.lang.IllegalStateException: Correlation id for response (1246203) 
> does not match request (1246122)
> *bad results*: kafka brokers in sick
> *root cause*: 
> 1) OldConsumer after 0.10.1 in ConsumerFetcherThread.scala setting 
> requestVersion 3:
> {code:java}
> private val fetchRequestBuilder = new FetchRequestBuilder().
>     clientId(clientId).
>     replicaId(Request.OrdinaryConsumerId).
>     maxWait(config.fetchWaitMaxMs).
>     minBytes(config.fetchMinBytes).
>     requestVersion(3) // for now, the old consumer is pinned to the old 
> message format through the fetch request
> {code}
> but in 0.10.0.1 FetchRequest.CurrentVersion=2, FetchRequst.readFrom wouldn't 
> read the field "max_bytes" from version 3, then throws 
> "BufferUnderflowException" :
> {code:java}
>   def readFrom(buffer: ByteBuffer): FetchRequest = {
>     val versionId = buffer.getShort
>     val correlationId = buffer.getInt
>     val clientId = readShortString(buffer)
>     val replicaId = buffer.getInt
>     val maxWait = buffer.getInt
>     val minBytes = buffer.getInt
>     val topicCount = buffer.getInt
>     val pairs = (1 to topicCount).flatMap(_ => {
>       val topic = readShortString(buffer)
>       val partitionCount = buffer.getInt
>       (1 to partitionCount).map(_ => {
>         val partitionId = buffer.getInt
>         val offset = buffer.getLong
>         val fetchSize = buffer.getInt
>         (TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, 
> fetchSize))
>       })
>     })
>     FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, 
> minBytes, Map(pairs:_*))
>   }
> {code}
> 2) when the FetchRequst.readFrom crashed with throwable like 
> "BufferUnderflowException" not in "(InvalidRequestException, 
> SchemaException)", the socket wouldn't be closed;
> SocketServer.processCompletedReceives:
> {code:java}
>   private def processCompletedReceives() {
>     selector.completedReceives.asScala.foreach { receive =>
>       try {
>         val openChannel = selector.channel(receive.source)
>         // Only methods that are safe to call on a disconnected channel 
> should be invoked on 'openOrClosingChannel'.
>         val openOrClosingChannel = if (openChannel != null) openChannel else 
> selector.closingChannel(receive.source)
>         val session = RequestChannel.Session(new 
> KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
> openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
>         val req = RequestChannel.Request(processor = id, connectionId = 
> receive.source, session = session,
>           buffer = receive.payload, startTimeNanos = time.nanoseconds,
>           listenerName = listenerName, securityProtocol = securityProtocol)
>         requestChannel.sendRequest(req)
>         selector.mute(receive.source)
>       } catch {
>         case e @ (_: InvalidRequestException | _: SchemaException) =>
>           // note that even though we got an exception, we can assume that 
> receive.source is valid. Issues with constructing a valid receive object were 
> handled earlier
>           error(s"Closing socket for ${receive.source} because of error", e)
>           close(selector, receive.source)
>       }
>     }
>   }
> {code}
> *workaround but not the optimal*:
> throw a known InvalidRequestException(or SchemaException more suitable)  in 
> RequestChannel.scala:
> {code:java}
>     /*// TODO: this will be removed once we migrated to client-side format
>     // for server-side request / response format
>     // NOTE: this map only includes the server-side request/response 
> handlers. Newer
>     // request types should only use the client-side versions which are 
> parsed with
>     // o.a.k.common.requests.AbstractRequest.getRequest()
>     private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => 
> RequestOrResponse]=
>       Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
>         ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> 
> ControlledShutdownRequest.readFrom
>       )
>     // TODO: this will be removed once we migrated to client-side format
>     val requestObj =
>       keyToNameAndDeserializerMap.get(requestId).map(readFrom => 
> readFrom(buffer)).orNull*/
>     val requestObj: RequestOrResponse = requestId match {
>       case ApiKeys.FETCH.id => getFetchRequest()
>       case ApiKeys.CONTROLLED_SHUTDOWN_KEY.id => 
> ControlledShutdownRequest.readFrom(buffer)
>       case _ => null
>     }
>     def getFetchRequest(): FetchRequest = {
>       try{
>         FetchRequest.readFrom(buffer)
>       }catch {
>         case ex: Throwable =>
>           throw new InvalidRequestException(s"FetchRequest version for API 
> key not match server's "+ requestId + ": " + FetchRequest.CurrentVersion, ex)
>       }
>     }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to