[ https://issues.apache.org/jira/browse/KAFKA-6404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16441820#comment-16441820 ]
Manikumar commented on KAFKA-6404: ---------------------------------- SocketServer error handling is improved in KAFKA-5607. This must have been handled in KAFKA-5607. > OldConsumer FetchRequest apiVersion not match resulting in broker > RequestHandler socket leak > -------------------------------------------------------------------------------------------- > > Key: KAFKA-6404 > URL: https://issues.apache.org/jira/browse/KAFKA-6404 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.10.0.1 > Reporter: Yu Gan > Priority: Critical > > *kafka broker version*: 0.10.0.1 > *cluster info*: 200+ nodes, no acls, any client in the same LAN could access > *situation*: someone uses high released version (such as 0.11.x) of > bin/kafka-console-consumer.sh with parameter "--zookeeper" to continuously > consume a topic with partitions spread all the brokers > *phenomenon*: > 1.broker server log: > errors like: > 1) Connection to 2 was disconnected before the response was read; > 2) Shrinking ISR for partition [abc, 21] from 33,13,14 to 33; > 3) ERROR Processor got uncaught exception. (kafka.network.Processor) > java.nio.BufferUnderflowException > 2.common consumers keeping in rebalance status: > errors like: > 1) c.p.b.f.l.c.FiberTopoWorkerThread : got uncaught exception > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the partitions > to another member. This means that the time between subsequent calls to > poll() was longer than the configured session.timeout.ms, which typically > implies that the poll loop is spending too much time message processing. You > can address this either by increasing the session timeout or by reducing the > maximum size of batches returned in poll() with max.poll.records. > 2) java.lang.IllegalStateException: Correlation id for response (1246203) > does not match request (1246122) > *bad results*: kafka brokers in sick > *root cause*: > 1) OldConsumer after 0.10.1 in ConsumerFetcherThread.scala setting > requestVersion 3: > {code:java} > private val fetchRequestBuilder = new FetchRequestBuilder(). > clientId(clientId). > replicaId(Request.OrdinaryConsumerId). > maxWait(config.fetchWaitMaxMs). > minBytes(config.fetchMinBytes). > requestVersion(3) // for now, the old consumer is pinned to the old > message format through the fetch request > {code} > but in 0.10.0.1 FetchRequest.CurrentVersion=2, FetchRequst.readFrom wouldn't > read the field "max_bytes" from version 3, then throws > "BufferUnderflowException" : > {code:java} > def readFrom(buffer: ByteBuffer): FetchRequest = { > val versionId = buffer.getShort > val correlationId = buffer.getInt > val clientId = readShortString(buffer) > val replicaId = buffer.getInt > val maxWait = buffer.getInt > val minBytes = buffer.getInt > val topicCount = buffer.getInt > val pairs = (1 to topicCount).flatMap(_ => { > val topic = readShortString(buffer) > val partitionCount = buffer.getInt > (1 to partitionCount).map(_ => { > val partitionId = buffer.getInt > val offset = buffer.getLong > val fetchSize = buffer.getInt > (TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, > fetchSize)) > }) > }) > FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, > minBytes, Map(pairs:_*)) > } > {code} > 2) when the FetchRequst.readFrom crashed with throwable like > "BufferUnderflowException" not in "(InvalidRequestException, > SchemaException)", the socket wouldn't be closed; > SocketServer.processCompletedReceives: > {code:java} > private def processCompletedReceives() { > selector.completedReceives.asScala.foreach { receive => > try { > val openChannel = selector.channel(receive.source) > // Only methods that are safe to call on a disconnected channel > should be invoked on 'openOrClosingChannel'. > val openOrClosingChannel = if (openChannel != null) openChannel else > selector.closingChannel(receive.source) > val session = RequestChannel.Session(new > KafkaPrincipal(KafkaPrincipal.USER_TYPE, > openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress) > val req = RequestChannel.Request(processor = id, connectionId = > receive.source, session = session, > buffer = receive.payload, startTimeNanos = time.nanoseconds, > listenerName = listenerName, securityProtocol = securityProtocol) > requestChannel.sendRequest(req) > selector.mute(receive.source) > } catch { > case e @ (_: InvalidRequestException | _: SchemaException) => > // note that even though we got an exception, we can assume that > receive.source is valid. Issues with constructing a valid receive object were > handled earlier > error(s"Closing socket for ${receive.source} because of error", e) > close(selector, receive.source) > } > } > } > {code} > *workaround but not the optimal*: > throw a known InvalidRequestException(or SchemaException more suitable) in > RequestChannel.scala: > {code:java} > /*// TODO: this will be removed once we migrated to client-side format > // for server-side request / response format > // NOTE: this map only includes the server-side request/response > handlers. Newer > // request types should only use the client-side versions which are > parsed with > // o.a.k.common.requests.AbstractRequest.getRequest() > private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => > RequestOrResponse]= > Map(ApiKeys.FETCH.id -> FetchRequest.readFrom, > ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> > ControlledShutdownRequest.readFrom > ) > // TODO: this will be removed once we migrated to client-side format > val requestObj = > keyToNameAndDeserializerMap.get(requestId).map(readFrom => > readFrom(buffer)).orNull*/ > val requestObj: RequestOrResponse = requestId match { > case ApiKeys.FETCH.id => getFetchRequest() > case ApiKeys.CONTROLLED_SHUTDOWN_KEY.id => > ControlledShutdownRequest.readFrom(buffer) > case _ => null > } > def getFetchRequest(): FetchRequest = { > try{ > FetchRequest.readFrom(buffer) > }catch { > case ex: Throwable => > throw new InvalidRequestException(s"FetchRequest version for API > key not match server's "+ requestId + ": " + FetchRequest.CurrentVersion, ex) > } > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)