chia7712 commented on a change in pull request #9850: URL: https://github.com/apache/kafka/pull/9850#discussion_r554525446
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -3242,6 +3209,133 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleEnvelope(request: RequestChannel.Request): Unit = { + val envelope = request.body[EnvelopeRequest] + + if (!config.metadataQuorumEnabled) { + // If forwarding is not yet enabled, we treat the request as unparsable and close the connection + closeConnection(request, Collections.emptyMap()) + return + } else if (!request.context.fromPrivilegedListener) { + sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException( Review comment: not sure whether ```ClusterAuthorizationException ``` suits this case. How about adding a (new) specify exception for this case? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -3242,6 +3209,133 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleEnvelope(request: RequestChannel.Request): Unit = { + val envelope = request.body[EnvelopeRequest] + + if (!config.metadataQuorumEnabled) { + // If forwarding is not yet enabled, we treat the request as unparsable and close the connection + closeConnection(request, Collections.emptyMap()) + return + } else if (!request.context.fromPrivilegedListener) { + sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException( + s"Invalid envelope request on unprivileged listener ${request.context.listenerName}")) + return + } else if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { + sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException( + s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope")) + return + } else if (!controller.isActive) { + sendErrorResponseMaybeThrottle(request, new NotControllerException( + s"Broker $brokerId is not the active controller")) + return + } + + try { + val forwardedPrincipal = parseForwardedPrincipal(request).getOrElse { + throw new PrincipalDeserializationException("Failed to parse client principal from envelope") + } + + val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress).getOrElse { + throw new InvalidRequestException("Failed to parse client address from envelope") + } + + // Note that any failure to parse the embedded request or its header is treated as + // an UNSUPPORTED_VERSION error rather than INVALID_REQUEST. The purpose + // is to disambiguate structural errors in the envelope request itself, such + // as an invalid client address. + + val forwardedRequestBuffer = envelope.requestData.duplicate() + val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer).getOrElse { + throw new UnsupportedVersionException("Failed to parse request header from envelope") + } + + val forwardedApi = forwardedRequestHeader.apiKey + if (!forwardedApi.forwardable || !forwardedApi.isEnabled) { + throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding") + } + + val forwardedContext = new RequestContext( + forwardedRequestHeader, + request.context.connectionId, + forwardedClientAddress, + forwardedPrincipal, + request.context.listenerName, + request.context.securityProtocol, + ClientInformation.EMPTY, + request.context.fromPrivilegedListener + ) + + val forwardedRequest = parseForwardedRequest(request, forwardedContext, forwardedRequestBuffer).getOrElse { + throw new UnsupportedVersionException(s"Failed to parse forwarded request with header $forwardedRequestHeader") + } + + handle(forwardedRequest) + } catch { + case e: KafkaException => + debug(s"Failed to handle envelope request $request", e) + sendErrorResponseMaybeThrottle(request, e) + } + } + + private def parseForwardedClientAddress( + address: Array[Byte] + ): Option[InetAddress] = { + try { + Some(InetAddress.getByAddress(address)) + } catch { + case e: UnknownHostException => + None + } + } + + private def parseForwardedRequest( + envelope: RequestChannel.Request, + forwardedContext: RequestContext, + buffer: ByteBuffer + ): Option[RequestChannel.Request] = { + try { + Some(new RequestChannel.Request( + processor = envelope.processor, + context = forwardedContext, + startTimeNanos = envelope.startTimeNanos, + envelope.memoryPool, + buffer, + requestChannel.metrics, + Some(envelope) + )) + } catch { + case e: InvalidRequestException => + None + } + } + + private def parseForwardedRequestHeader( + buffer: ByteBuffer + ): Option[RequestHeader] = { + try { + Some(RequestHeader.parse(buffer)) + } catch { + case e: InvalidRequestException => + None + } + } + + private def parseForwardedPrincipal( + envelopeRequest: RequestChannel.Request + ): Option[KafkaPrincipal] = { + val envelope = envelopeRequest.body[EnvelopeRequest] + try { + val principalSerde = envelopeRequest.context.principalSerde.asScala + principalSerde.map { serde => + serde.deserialize(envelope.requestPrincipal) + } + } catch { + case e: Exception => + warn(s"Failed to deserialize principal from envelope request $envelope", e) Review comment: We can covert the exception to ```throw new PrincipalDeserializationException("Failed to parse client principal from envelope")``` here to avoid redundant try-catch (another is in handleEnvelope) and unnecessary ```Option``` wrap. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -3242,6 +3209,133 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleEnvelope(request: RequestChannel.Request): Unit = { + val envelope = request.body[EnvelopeRequest] + + if (!config.metadataQuorumEnabled) { + // If forwarding is not yet enabled, we treat the request as unparsable and close the connection + closeConnection(request, Collections.emptyMap()) + return + } else if (!request.context.fromPrivilegedListener) { + sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException( + s"Invalid envelope request on unprivileged listener ${request.context.listenerName}")) + return + } else if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { + sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException( + s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope")) + return + } else if (!controller.isActive) { + sendErrorResponseMaybeThrottle(request, new NotControllerException( + s"Broker $brokerId is not the active controller")) + return + } + + try { + val forwardedPrincipal = parseForwardedPrincipal(request).getOrElse { + throw new PrincipalDeserializationException("Failed to parse client principal from envelope") + } + + val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress).getOrElse { + throw new InvalidRequestException("Failed to parse client address from envelope") + } + + // Note that any failure to parse the embedded request or its header is treated as + // an UNSUPPORTED_VERSION error rather than INVALID_REQUEST. The purpose + // is to disambiguate structural errors in the envelope request itself, such + // as an invalid client address. + + val forwardedRequestBuffer = envelope.requestData.duplicate() + val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer).getOrElse { + throw new UnsupportedVersionException("Failed to parse request header from envelope") + } + + val forwardedApi = forwardedRequestHeader.apiKey + if (!forwardedApi.forwardable || !forwardedApi.isEnabled) { + throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding") + } + + val forwardedContext = new RequestContext( + forwardedRequestHeader, + request.context.connectionId, + forwardedClientAddress, + forwardedPrincipal, + request.context.listenerName, + request.context.securityProtocol, + ClientInformation.EMPTY, + request.context.fromPrivilegedListener + ) + + val forwardedRequest = parseForwardedRequest(request, forwardedContext, forwardedRequestBuffer).getOrElse { + throw new UnsupportedVersionException(s"Failed to parse forwarded request with header $forwardedRequestHeader") + } + + handle(forwardedRequest) + } catch { + case e: KafkaException => + debug(s"Failed to handle envelope request $request", e) + sendErrorResponseMaybeThrottle(request, e) + } + } + + private def parseForwardedClientAddress( + address: Array[Byte] + ): Option[InetAddress] = { + try { + Some(InetAddress.getByAddress(address)) + } catch { + case e: UnknownHostException => + None + } + } + + private def parseForwardedRequest( + envelope: RequestChannel.Request, + forwardedContext: RequestContext, + buffer: ByteBuffer + ): Option[RequestChannel.Request] = { + try { + Some(new RequestChannel.Request( + processor = envelope.processor, + context = forwardedContext, + startTimeNanos = envelope.startTimeNanos, + envelope.memoryPool, + buffer, + requestChannel.metrics, + Some(envelope) + )) + } catch { + case e: InvalidRequestException => + None + } + } + + private def parseForwardedRequestHeader( + buffer: ByteBuffer + ): Option[RequestHeader] = { + try { + Some(RequestHeader.parse(buffer)) + } catch { + case e: InvalidRequestException => Review comment: ditto ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -3242,6 +3209,133 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleEnvelope(request: RequestChannel.Request): Unit = { + val envelope = request.body[EnvelopeRequest] + + if (!config.metadataQuorumEnabled) { + // If forwarding is not yet enabled, we treat the request as unparsable and close the connection + closeConnection(request, Collections.emptyMap()) + return + } else if (!request.context.fromPrivilegedListener) { + sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException( + s"Invalid envelope request on unprivileged listener ${request.context.listenerName}")) + return + } else if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { + sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException( + s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope")) + return + } else if (!controller.isActive) { + sendErrorResponseMaybeThrottle(request, new NotControllerException( + s"Broker $brokerId is not the active controller")) + return + } + + try { + val forwardedPrincipal = parseForwardedPrincipal(request).getOrElse { + throw new PrincipalDeserializationException("Failed to parse client principal from envelope") + } + + val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress).getOrElse { + throw new InvalidRequestException("Failed to parse client address from envelope") + } + + // Note that any failure to parse the embedded request or its header is treated as + // an UNSUPPORTED_VERSION error rather than INVALID_REQUEST. The purpose + // is to disambiguate structural errors in the envelope request itself, such + // as an invalid client address. + + val forwardedRequestBuffer = envelope.requestData.duplicate() + val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer).getOrElse { + throw new UnsupportedVersionException("Failed to parse request header from envelope") + } + + val forwardedApi = forwardedRequestHeader.apiKey + if (!forwardedApi.forwardable || !forwardedApi.isEnabled) { + throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding") + } + + val forwardedContext = new RequestContext( + forwardedRequestHeader, + request.context.connectionId, + forwardedClientAddress, + forwardedPrincipal, + request.context.listenerName, + request.context.securityProtocol, + ClientInformation.EMPTY, + request.context.fromPrivilegedListener + ) + + val forwardedRequest = parseForwardedRequest(request, forwardedContext, forwardedRequestBuffer).getOrElse { + throw new UnsupportedVersionException(s"Failed to parse forwarded request with header $forwardedRequestHeader") + } + + handle(forwardedRequest) + } catch { + case e: KafkaException => + debug(s"Failed to handle envelope request $request", e) + sendErrorResponseMaybeThrottle(request, e) + } + } + + private def parseForwardedClientAddress( + address: Array[Byte] + ): Option[InetAddress] = { + try { + Some(InetAddress.getByAddress(address)) + } catch { + case e: UnknownHostException => + None + } + } + + private def parseForwardedRequest( + envelope: RequestChannel.Request, + forwardedContext: RequestContext, + buffer: ByteBuffer + ): Option[RequestChannel.Request] = { + try { + Some(new RequestChannel.Request( + processor = envelope.processor, + context = forwardedContext, + startTimeNanos = envelope.startTimeNanos, + envelope.memoryPool, + buffer, + requestChannel.metrics, + Some(envelope) + )) + } catch { + case e: InvalidRequestException => Review comment: ditto ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -186,77 +160,70 @@ class KafkaApis(val requestChannel: RequestChannel, trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" + s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}") - val handled = request.envelope.exists { envelope => Review comment: We don't need to check ```envelope``` for each request type now? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org