This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 645c1ba526 MINOR: Fix buildResponseSend test cases for envelope
responses (#12185)
645c1ba526 is described below
commit 645c1ba526ec11049429dc5e9ba347fc386df58e
Author: Jason Gustafson <[email protected]>
AuthorDate: Mon May 30 11:34:36 2022 -0700
MINOR: Fix buildResponseSend test cases for envelope responses (#12185)
The test cases we have in `RequestChannelTest` for `buildResponseSend`
construct the envelope request incorrectly. The request is created using the
envelope context, but also a reference to the wrapped envelope request object.
This patch fixes `TestUtils.buildEnvelopeRequest` so that the wrapped request
is built properly. It also fixes the dependence on this incorrect construction
and consolidates the tests in `RequestChannelTest` to avoid duplication.
Reviewers: dengziming <[email protected]>, David Jacot
<[email protected]>
---
.../main/scala/kafka/server/EnvelopeUtils.scala | 3 +-
.../unit/kafka/network/RequestChannelTest.scala | 163 +++++++++++----------
.../scala/unit/kafka/server/KafkaApisTest.scala | 6 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 26 ++--
4 files changed, 98 insertions(+), 100 deletions(-)
diff --git a/core/src/main/scala/kafka/server/EnvelopeUtils.scala
b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
index ec8871f382..a162ae5fe8 100644
--- a/core/src/main/scala/kafka/server/EnvelopeUtils.scala
+++ b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
@@ -32,7 +32,8 @@ object EnvelopeUtils {
def handleEnvelopeRequest(
request: RequestChannel.Request,
requestChannelMetrics: RequestChannel.Metrics,
- handler: RequestChannel.Request => Unit): Unit = {
+ handler: RequestChannel.Request => Unit
+ ): Unit = {
val envelope = request.body[EnvelopeRequest]
val forwardedPrincipal = parseForwardedPrincipal(request.context,
envelope.requestPrincipal)
val forwardedClientAddress =
parseForwardedClientAddress(envelope.clientAddress)
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index f3f8ca884c..bddf03a136 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -18,42 +18,44 @@
package kafka.network
-import java.io.IOException
-import java.net.InetAddress
-import java.nio.ByteBuffer
-import java.util.Collections
import com.fasterxml.jackson.databind.ObjectMapper
import kafka.network
+import kafka.server.EnvelopeUtils
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigResource, SaslConfigs,
SslConfigs, TopicConfig}
import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
-import org.apache.kafka.common.network.{ByteBufferSend, ClientInformation,
ListenerName}
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, MetadataRequest,
RequestTestUtils}
+import org.apache.kafka.common.message.{CreateTopicsRequestData,
CreateTopicsResponseData, IncrementalAlterConfigsRequestData}
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AlterConfigsRequest._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.test
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
import org.mockito.Mockito.mock
-import org.mockito.{ArgumentCaptor, Mockito}
+import java.io.IOException
+import java.net.InetAddress
+import java.nio.ByteBuffer
+import java.util.Collections
+import java.util.concurrent.atomic.AtomicReference
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
class RequestChannelTest {
private val requestChannelMetrics: RequestChannel.Metrics =
mock(classOf[RequestChannel.Metrics])
- private val clientId = "id"
private val principalSerde = new KafkaPrincipalSerde() {
override def serialize(principal: KafkaPrincipal): Array[Byte] =
Utils.utf8(principal.toString)
override def deserialize(bytes: Array[Byte]): KafkaPrincipal =
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
}
- private val mockSend: ByteBufferSend = Mockito.mock(classOf[ByteBufferSend])
@Test
def testAlterRequests(): Unit = {
@@ -191,84 +193,66 @@ class RequestChannelTest {
assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
}
- @Test
- def
testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError():
Unit = {
- val channelRequest =
buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
- val envelopeResponseArgumentCaptor =
ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
- Mockito.doAnswer(_ => mockSend)
-
.when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
- // create an inner response without error
- val responseWithoutError = RequestTestUtils.metadataUpdateWith(2,
Collections.singletonMap("a", 2))
-
- // build an envelope response
- channelRequest.buildResponseSend(responseWithoutError)
-
- // expect the envelopeResponse result without error
- val capturedValue: EnvelopeResponse =
envelopeResponseArgumentCaptor.getValue
- assertTrue(capturedValue.error().equals(Errors.NONE))
+ @ParameterizedTest
+ @EnumSource(value=classOf[Errors], names=Array("NONE",
"CLUSTER_AUTHORIZATION_FAILED", "NOT_CONTROLLER"))
+ def testBuildEnvelopeResponse(error: Errors): Unit = {
+ val topic = "foo"
+ val createTopicRequest = buildCreateTopicRequest(topic)
+ val unwrapped = buildUnwrappedEnvelopeRequest(createTopicRequest)
+
+ val createTopicResponse = buildCreateTopicResponse(topic, error)
+ val envelopeResponse = buildEnvelopeResponse(unwrapped,
createTopicResponse)
+
+ error match {
+ case Errors.NOT_CONTROLLER =>
+ assertEquals(Errors.NOT_CONTROLLER, envelopeResponse.error)
+ assertNull(envelopeResponse.responseData)
+ case _ =>
+ assertEquals(Errors.NONE, envelopeResponse.error)
+ val unwrappedResponse =
AbstractResponse.parseResponse(envelopeResponse.responseData, unwrapped.header)
+ assertEquals(createTopicResponse.data, unwrappedResponse.data)
+ }
}
- @Test
- def
testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoNotControllerError():
Unit = {
- val channelRequest =
buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
- val envelopeResponseArgumentCaptor =
ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
- Mockito.doAnswer(_ => mockSend)
-
.when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
- // create an inner response with REQUEST_TIMED_OUT error
- val responseWithTimeoutError =
RequestTestUtils.metadataUpdateWith("cluster1", 2,
- Collections.singletonMap("a", Errors.REQUEST_TIMED_OUT),
- Collections.singletonMap("a", 2))
-
- // build an envelope response
- channelRequest.buildResponseSend(responseWithTimeoutError)
-
- // expect the envelopeResponse result without error
- val capturedValue: EnvelopeResponse =
envelopeResponseArgumentCaptor.getValue
- assertTrue(capturedValue.error().equals(Errors.NONE))
+ private def buildCreateTopicRequest(topic: String): CreateTopicsRequest = {
+ val requestData = new CreateTopicsRequestData()
+ requestData.topics.add(new CreatableTopic()
+ .setName(topic)
+ .setReplicationFactor(-1)
+ .setNumPartitions(-1)
+ )
+ new CreateTopicsRequest.Builder(requestData).build()
}
- @Test
- def
testEnvelopeBuildResponseSendShouldReturnNotControllerErrorIfInnerResponseHasOne():
Unit = {
- val channelRequest =
buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
- val envelopeResponseArgumentCaptor =
ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
- Mockito.doAnswer(_ => mockSend)
-
.when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
- // create an inner response with NOT_CONTROLLER error
- val responseWithNotControllerError =
RequestTestUtils.metadataUpdateWith("cluster1", 2,
- Collections.singletonMap("a", Errors.NOT_CONTROLLER),
- Collections.singletonMap("a", 2))
-
- // build an envelope response
- channelRequest.buildResponseSend(responseWithNotControllerError)
-
- // expect the envelopeResponse result has NOT_CONTROLLER error
- val capturedValue: EnvelopeResponse =
envelopeResponseArgumentCaptor.getValue
- assertTrue(capturedValue.error().equals(Errors.NOT_CONTROLLER))
+ private def buildCreateTopicResponse(
+ topic: String,
+ error: Errors,
+ ): CreateTopicsResponse = {
+ val responseData = new CreateTopicsResponseData()
+ responseData.topics.add(new CreateTopicsResponseData.CreatableTopicResult()
+ .setName(topic)
+ .setErrorCode(error.code)
+ )
+ new CreateTopicsResponse(responseData)
}
- private def buildMetadataRequest(): AbstractRequest = {
- val resourceName = "topic-1"
- val header = new RequestHeader(ApiKeys.METADATA,
ApiKeys.METADATA.latestVersion,
- clientId, 0)
+ private def buildUnwrappedEnvelopeRequest(request: AbstractRequest):
RequestChannel.Request = {
+ val wrappedRequest = TestUtils.buildEnvelopeRequest(
+ request,
+ principalSerde,
+ requestChannelMetrics,
+ System.nanoTime()
+ )
- new MetadataRequest.Builder(Collections.singletonList(resourceName),
true).build(header.apiVersion)
- }
+ val unwrappedRequest = new AtomicReference[RequestChannel.Request]()
- private def buildForwardRequestWithEnvelopeRequestAttached(request:
AbstractRequest): RequestChannel.Request = {
- val envelopeRequest = TestUtils.buildRequestWithEnvelope(
- request, principalSerde, requestChannelMetrics, System.nanoTime(),
shouldSpyRequestContext = true)
+ EnvelopeUtils.handleEnvelopeRequest(
+ wrappedRequest,
+ requestChannelMetrics,
+ request => unwrappedRequest.set(request)
+ )
- TestUtils.buildRequestWithEnvelope(
- request, principalSerde, requestChannelMetrics, System.nanoTime(),
envelope = Option(envelopeRequest))
+ unwrappedRequest.get()
}
private def isValidJson(str: String): Boolean = {
@@ -312,4 +296,23 @@ class RequestChannelTest {
private def toMap(config:
IncrementalAlterConfigsRequestData.AlterableConfigCollection): Map[String,
String] = {
config.asScala.map(e => e.name -> e.value).toMap
}
+
+ private def buildEnvelopeResponse(
+ unwrapped: RequestChannel.Request,
+ response: AbstractResponse
+ ): EnvelopeResponse = {
+ assertTrue(unwrapped.envelope.isDefined)
+ val envelope = unwrapped.envelope.get
+
+ val send = unwrapped.buildResponseSend(response)
+ val sendBytes = test.TestUtils.toBuffer(send)
+
+ // We need to read the size field before `parseResponse` below
+ val size = sendBytes.getInt
+ assertEquals(size, sendBytes.remaining())
+ val envelopeResponse = AbstractResponse.parseResponse(sendBytes,
envelope.header)
+
+ assertTrue(envelopeResponse.isInstanceOf[EnvelopeResponse])
+ envelopeResponse.asInstanceOf[EnvelopeResponse]
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 74722d5e49..cc34cabe05 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -307,7 +307,7 @@ class KafkaApisTest {
Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava,
false).build(requestHeader.apiVersion)
- val request = TestUtils.buildRequestWithEnvelope(
+ val request = TestUtils.buildEnvelopeRequest(
alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics,
time.nanoseconds())
val capturedResponse: ArgumentCaptor[AlterConfigsResponse] =
ArgumentCaptor.forClass(classOf[AlterConfigsResponse])
@@ -341,7 +341,7 @@ class KafkaApisTest {
when(controller.isActive).thenReturn(true)
- val request = TestUtils.buildRequestWithEnvelope(
+ val request = TestUtils.buildEnvelopeRequest(
leaveGroupRequest, kafkaPrincipalSerde, requestChannelMetrics,
time.nanoseconds())
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
@@ -396,7 +396,7 @@ class KafkaApisTest {
val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava,
false)
.build(requestHeader.apiVersion)
- val request = TestUtils.buildRequestWithEnvelope(
+ val request = TestUtils.buildEnvelopeRequest(
alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics,
time.nanoseconds(), fromPrivilegedListener)
val capturedResponse: ArgumentCaptor[AbstractResponse] =
ArgumentCaptor.forClass(classOf[AbstractResponse])
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e097dbd620..ad3c34f960 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -76,7 +76,6 @@ import
org.apache.zookeeper.KeeperException.SessionExpiredException
import org.apache.zookeeper.ZooDefs._
import org.apache.zookeeper.data.ACL
import org.junit.jupiter.api.Assertions._
-import org.mockito.Mockito
import scala.annotation.nowarn
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@@ -2172,14 +2171,13 @@ object TestUtils extends Logging {
}
}
- def buildRequestWithEnvelope(request: AbstractRequest,
- principalSerde: KafkaPrincipalSerde,
- requestChannelMetrics: RequestChannel.Metrics,
- startTimeNanos: Long,
- fromPrivilegedListener: Boolean = true,
- shouldSpyRequestContext: Boolean = false,
- envelope: Option[RequestChannel.Request] = None
- ): RequestChannel.Request = {
+ def buildEnvelopeRequest(
+ request: AbstractRequest,
+ principalSerde: KafkaPrincipalSerde,
+ requestChannelMetrics: RequestChannel.Metrics,
+ startTimeNanos: Long,
+ fromPrivilegedListener: Boolean = true
+ ): RequestChannel.Request = {
val clientId = "id"
val listenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
@@ -2195,22 +2193,18 @@ object TestUtils extends Logging {
RequestHeader.parse(envelopeBuffer)
- var requestContext = new RequestContext(envelopeHeader, "1",
InetAddress.getLocalHost,
+ val envelopeContext = new RequestContext(envelopeHeader, "1",
InetAddress.getLocalHost,
KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY,
fromPrivilegedListener, Optional.of(principalSerde))
- if (shouldSpyRequestContext) {
- requestContext = Mockito.spy(requestContext)
- }
-
new RequestChannel.Request(
processor = 1,
- context = requestContext,
+ context = envelopeContext,
startTimeNanos = startTimeNanos,
memoryPool = MemoryPool.NONE,
buffer = envelopeBuffer,
metrics = requestChannelMetrics,
- envelope = envelope
+ envelope = None
)
}