This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3ea2d91 MINOR: Fix concurrency bug in MetadataCache and Metadata
request when listeners inconsistent (#4374)
3ea2d91 is described below
commit 3ea2d912c82a021bf43ba7a26b9b714c2fc0e817
Author: Ismael Juma <[email protected]>
AuthorDate: Thu Jan 4 18:16:04 2018 +0000
MINOR: Fix concurrency bug in MetadataCache and Metadata request when
listeners inconsistent (#4374)
- Add missing locking/volatile in MetadataCache.aliveEndPoint
- Fix topic metadata not to throw BrokerNotAvailableException
when listeners are inconsistent. Add test verifying the fix. As
part of this fix, renamed Broker methods to follow Map
convention where the `get` version returns `Option`.
Reviewers: Jason Gustafson <[email protected]>
---
.../scala/kafka/admin/ConsumerGroupCommand.scala | 2 +-
core/src/main/scala/kafka/cluster/Broker.scala | 15 +++---
.../controller/ControllerChannelManager.scala | 6 +--
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 5 +-
.../main/scala/kafka/server/MetadataCache.scala | 10 ++--
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../main/scala/kafka/tools/UpdateOffsetsInZK.scala | 2 +-
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 8 +--
.../api/RequestResponseSerializationTest.scala | 2 +-
.../unit/kafka/cluster/BrokerEndPointTest.scala | 12 ++---
.../scala/unit/kafka/server/KafkaApisTest.scala | 63 ++++++++++++++++++++--
.../unit/kafka/server/LeaderElectionTest.scala | 2 +-
.../server/epoch/LeaderEpochIntegrationTest.scala | 2 +-
14 files changed, 96 insertions(+), 37 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index e835510..426dbfd 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -505,7 +505,7 @@ object ConsumerGroupCommand extends Logging {
private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
try {
zkUtils.getBrokerInfo(brokerId)
-
.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)))
+
.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)))
.map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port,
10000, 100000, "ConsumerGroupCommand"))
.orElse(throw new BrokerNotAvailableException("Broker id %d does not
exist".format(brokerId)))
} catch {
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala
b/core/src/main/scala/kafka/cluster/Broker.scala
index 425eafc..0106982 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -47,13 +47,16 @@ case class Broker(id: Int, endPoints: Seq[EndPoint], rack:
Option[String]) {
this(bep.id, bep.host, bep.port, listenerName, protocol)
}
- def getNode(listenerName: ListenerName): Node = {
- val endpoint = endPointsMap.getOrElse(listenerName,
- throw new BrokerEndPointNotAvailableException(s"End point with listener
name ${listenerName.value} not found for broker $id"))
- new Node(id, endpoint.host, endpoint.port, rack.orNull)
- }
+ def node(listenerName: ListenerName): Node =
+ getNode(listenerName).getOrElse {
+ throw new BrokerEndPointNotAvailableException(s"End point with listener
name ${listenerName.value} not found " +
+ s"for broker $id")
+ }
+
+ def getNode(listenerName: ListenerName): Option[Node] =
+ endPointsMap.get(listenerName).map(endpoint => new Node(id, endpoint.host,
endpoint.port, rack.orNull))
- def getBrokerEndPoint(listenerName: ListenerName): BrokerEndPoint = {
+ def brokerEndPoint(listenerName: ListenerName): BrokerEndPoint = {
val endpoint = endPointsMap.getOrElse(listenerName,
throw new BrokerEndPointNotAvailableException(s"End point with listener
name ${listenerName.value} not found for broker $id"))
new BrokerEndPoint(id, endpoint.host, endpoint.port)
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 1b25f5f..e389821 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -109,7 +109,7 @@ class ControllerChannelManager(controllerContext:
ControllerContext, config: Kaf
private def addNewBroker(broker: Broker) {
val messageQueue = new LinkedBlockingQueue[QueueItem]
debug(s"Controller ${config.brokerId} trying to connect to broker
${broker.id}")
- val brokerNode = broker.getNode(config.interBrokerListenerName)
+ val brokerNode = broker.node(config.interBrokerListenerName)
val logContext = new LogContext(s"[Controller id=${config.brokerId},
targetBrokerId=${brokerNode.idString}] ")
val networkClient = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
@@ -408,7 +408,7 @@ class ControllerBrokerRequestBatch(controller:
KafkaController, stateChangeLogge
}
val leaderIds =
leaderAndIsrPartitionStates.map(_._2.basePartitionState.leader).toSet
val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b =>
leaderIds.contains(b.id)).map {
- _.getNode(controller.config.interBrokerListenerName)
+ _.node(controller.config.interBrokerListenerName)
}
val leaderAndIsrRequestBuilder = new
LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
controllerEpoch, leaderAndIsrPartitionStates.asJava, leaders.asJava)
@@ -436,7 +436,7 @@ class ControllerBrokerRequestBatch(controller:
KafkaController, stateChangeLogge
controllerContext.liveOrShuttingDownBrokers.map { broker =>
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName =
ListenerName.forSecurityProtocol(securityProtocol)
- val node = broker.getNode(listenerName)
+ val node = broker.node(listenerName)
val endPoints = Seq(new EndPoint(node.host, node.port,
securityProtocol, listenerName))
new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava,
broker.rack.orNull)
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index f52a720..758a305 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -975,7 +975,7 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseMaybeThrottle(request, requestThrottleMs =>
new MetadataResponse(
requestThrottleMs,
- brokers.map(_.getNode(request.context.listenerName)).asJava,
+ brokers.flatMap(_.getNode(request.context.listenerName)).asJava,
clusterId,
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index d073584..df06c1a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -374,10 +374,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
*/
private def controlledShutdown() {
- def node(broker: Broker): Node = {
- val brokerEndPoint =
broker.getBrokerEndPoint(config.interBrokerListenerName)
- new Node(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
- }
+ def node(broker: Broker): Node =
broker.node(config.interBrokerListenerName)
val socketTimeoutMs = config.controllerSocketTimeoutMs
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index fd11bfa..b4a015d 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.requests.{MetadataResponse,
UpdateMetadataRequest
class MetadataCache(brokerId: Int) extends Logging {
private val cache = mutable.Map[String, mutable.Map[Int,
UpdateMetadataRequest.PartitionState]]()
- private var controllerId: Option[Int] = None
+ @volatile private var controllerId: Option[Int] = None
private val aliveBrokers = mutable.Map[Int, Broker]()
private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName,
Node]]()
private val partitionMetadataLock = new ReentrantReadWriteLock()
@@ -104,9 +104,11 @@ class MetadataCache(brokerId: Int) extends Logging {
}
def getAliveEndpoint(brokerId: Int, listenerName: ListenerName):
Option[Node] =
- aliveNodes.get(brokerId).map { nodeMap =>
- nodeMap.getOrElse(listenerName,
- throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId`
does not have listener with name `$listenerName`"))
+ inReadLock(partitionMetadataLock) {
+ aliveNodes.get(brokerId).map { nodeMap =>
+ nodeMap.getOrElse(listenerName,
+ throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId`
does not have listener with name `$listenerName`"))
+ }
}
// errorUnavailableEndpoints exists to support v0 MetadataResponses
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 593c876..f4bfe39 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1304,7 +1304,7 @@ class ReplicaManager(val config: KafkaConfig,
// we do not need to check if the leader exists again since this has
been done at the beginning of this process
val partitionsToMakeFollowerWithLeaderAndOffset =
partitionsToMakeFollower.map(partition =>
partition.topicPartition -> BrokerAndInitialOffset(
- metadataCache.getAliveBrokers.find(_.id ==
partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
+ metadataCache.getAliveBrokers.find(_.id ==
partition.leaderReplicaIdOpt.get).get.brokerEndPoint(config.interBrokerListenerName),
partition.getReplica().get.highWatermark.messageOffset)).toMap
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 20f1db2..204f5c1 100755
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -70,7 +70,7 @@ object UpdateOffsetsInZK extends Logging {
zkUtils.getBrokerInfo(broker) match {
case Some(brokerInfo) =>
- val brokerEndPoint =
brokerInfo.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+ val brokerEndPoint =
brokerInfo.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
val consumer = new SimpleConsumer(brokerEndPoint.host,
brokerEndPoint.port, 10000, 100 * 1024, "UpdateOffsetsInZk")
val topicAndPartition = TopicAndPartition(topic, partition)
val request = OffsetRequest(Map(topicAndPartition ->
PartitionOffsetRequestInfo(offsetOption, 1)))
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index b68379b..43d8ec8 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -109,7 +109,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2)
val listenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
- val metadata = ClientUtils.fetchTopicMetadata(Set(topic1),
brokers.map(_.getBrokerEndPoint(listenerName)),
+ val metadata = ClientUtils.fetchTopicMetadata(Set(topic1),
brokers.map(_.brokerEndPoint(listenerName)),
"AddPartitionsTest-testIncrementPartitions", 2000, 0).topicsMetadata
val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1))
val partitionDataForTopic1 =
metaDataForTopic1.head.partitionsMetadata.sortBy(_.partitionId)
@@ -138,7 +138,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
-
brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
+
brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
"AddPartitionsTest-testManualAssignmentOfReplicas", 2000,
0).topicsMetadata
val metaDataForTopic2 = metadata.filter(_.topic == topic2)
val partitionDataForTopic2 =
metaDataForTopic2.head.partitionsMetadata.sortBy(_.partitionId)
@@ -164,7 +164,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
val metadata = ClientUtils.fetchTopicMetadata(Set(topic3),
-
brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
+
brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
"AddPartitionsTest-testReplicaPlacementAllServers", 2000,
0).topicsMetadata
val metaDataForTopic3 = metadata.find(p => p.topic == topic3).get
@@ -187,7 +187,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
-
brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
+
brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
"AddPartitionsTest-testReplicaPlacementPartialServers", 2000,
0).topicsMetadata
val metaDataForTopic2 = metadata.find(p => p.topic == topic2).get
diff --git
a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 026786e..4bcf61d 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -158,7 +158,7 @@ object SerializationTestUtils {
def createConsumerMetadataResponse: GroupCoordinatorResponse = {
GroupCoordinatorResponse(Some(
-
brokers.head.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
Errors.NONE, 0)
+
brokers.head.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
Errors.NONE, 0)
}
}
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index c60a7ed..380b5ad 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -59,7 +59,7 @@ class BrokerEndPointTest {
}"""
val broker = parseBrokerJson(1, brokerInfoStr)
assertEquals(1, broker.id)
- val brokerEndPoint =
broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
+ val brokerEndPoint =
broker.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
assertEquals("localhost", brokerEndPoint.host)
assertEquals(9093, brokerEndPoint.port)
}
@@ -76,7 +76,7 @@ class BrokerEndPointTest {
}"""
val broker = parseBrokerJson(1, brokerInfoStr)
assertEquals(1, broker.id)
- val brokerEndPoint =
broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+ val brokerEndPoint =
broker.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
assertEquals("localhost", brokerEndPoint.host)
assertEquals(9092, brokerEndPoint.port)
}
@@ -86,7 +86,7 @@ class BrokerEndPointTest {
val brokerInfoStr =
"""{"jmx_port":-1,"timestamp":"1420485325400","host":"172.16.8.243","version":1,"port":9091}"""
val broker = parseBrokerJson(1, brokerInfoStr)
assertEquals(1, broker.id)
- val brokerEndPoint =
broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+ val brokerEndPoint =
broker.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
assertEquals("172.16.8.243", brokerEndPoint.host)
assertEquals(9091, brokerEndPoint.port)
}
@@ -104,7 +104,7 @@ class BrokerEndPointTest {
}"""
val broker = parseBrokerJson(1, json)
assertEquals(1, broker.id)
- val brokerEndPoint =
broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
+ val brokerEndPoint =
broker.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
assertEquals("host1", brokerEndPoint.host)
assertEquals(9093, brokerEndPoint.port)
assertEquals(Some("dc1"), broker.rack)
@@ -124,7 +124,7 @@ class BrokerEndPointTest {
}"""
val broker = parseBrokerJson(1, json)
assertEquals(1, broker.id)
- val brokerEndPoint = broker.getBrokerEndPoint(new ListenerName("CLIENT"))
+ val brokerEndPoint = broker.brokerEndPoint(new ListenerName("CLIENT"))
assertEquals("host1", brokerEndPoint.host)
assertEquals(9092, brokerEndPoint.port)
assertEquals(None, broker.rack)
@@ -143,7 +143,7 @@ class BrokerEndPointTest {
}"""
val broker = parseBrokerJson(1, json)
assertEquals(1, broker.id)
- val brokerEndPoint = broker.getBrokerEndPoint(new ListenerName("CLIENT"))
+ val brokerEndPoint = broker.brokerEndPoint(new ListenerName("CLIENT"))
assertEquals("host1", brokerEndPoint.host)
assertEquals(9092, brokerEndPoint.port)
assertEquals(None, broker.rack)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index fd6073d..a85a10b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -61,9 +61,9 @@ class KafkaApisTest {
private val txnCoordinator =
EasyMock.createNiceMock(classOf[TransactionCoordinator])
private val controller = EasyMock.createNiceMock(classOf[KafkaController])
private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
- private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
private val metrics = new Metrics()
private val brokerId = 1
+ private val metadataCache = new MetadataCache(brokerId)
private val authorizer: Option[Authorizer] = None
private val clientQuotaManager =
EasyMock.createNiceMock(classOf[ClientQuotaManager])
private val clientRequestQuotaManager =
EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
@@ -363,6 +363,61 @@ class KafkaApisTest {
testConsumerListOffsetLatest(IsolationLevel.READ_COMMITTED)
}
+ /**
+ * Verifies that the metadata response is correct if the broker listeners
are inconsistent (i.e. one broker has
+ * more listeners than another) and the request is sent on the listener that
exists in both brokers.
+ */
+ @Test
+ def
testMetadataRequestOnSharedListenerWithInconsistentListenersAcrossBrokers():
Unit = {
+ val (plaintextListener, _) = updateMetadataCacheWithInconsistentListeners()
+ val response =
sendMetadataRequestWithInconsistentListeners(plaintextListener)
+ assertEquals(Set(0, 1), response.brokers.asScala.map(_.id).toSet)
+ }
+
+ /*
+ * Verifies that the metadata response is correct if the broker listeners
are inconsistent (i.e. one broker has
+ * more listeners than another) and the request is sent on the listener that
exists in one broker.
+ */
+ @Test
+ def
testMetadataRequestOnDistinctListenerWithInconsistentListenersAcrossBrokers():
Unit = {
+ val (_, anotherListener) = updateMetadataCacheWithInconsistentListeners()
+ val response =
sendMetadataRequestWithInconsistentListeners(anotherListener)
+ assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
+ }
+
+ /**
+ * Return pair of listener names in the metadataCache: PLAINTEXT and
LISTENER2 respectively.
+ */
+ private def updateMetadataCacheWithInconsistentListeners(): (ListenerName,
ListenerName) = {
+ import UpdateMetadataRequest.{Broker => UBroker}
+ import UpdateMetadataRequest.{EndPoint => UEndPoint}
+ val plaintextListener =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ val anotherListener = new ListenerName("LISTENER2")
+ val brokers = Set(
+ new UBroker(0, Seq(new UEndPoint("broker0", 9092,
SecurityProtocol.PLAINTEXT, plaintextListener),
+ new UEndPoint("broker0", 9093, SecurityProtocol.PLAINTEXT,
anotherListener)).asJava, "rack"),
+ new UBroker(1, Seq(new UEndPoint("broker1", 9092,
SecurityProtocol.PLAINTEXT, plaintextListener)).asJava,
+ "rack")
+ )
+ val updateMetadataRequest = new
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
+ 0, Map.empty[TopicPartition,
UpdateMetadataRequest.PartitionState].asJava, brokers.asJava).build()
+ metadataCache.updateCache(correlationId = 0, updateMetadataRequest)
+ (plaintextListener, anotherListener)
+ }
+
+ private def sendMetadataRequestWithInconsistentListeners(requestListener:
ListenerName): MetadataResponse = {
+ val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
+ val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
+ expectThrottleCallbackAndInvoke(capturedThrottleCallback)
+
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+ EasyMock.replay(clientRequestQuotaManager, requestChannel)
+
+ val (metadataRequest, requestChannelRequest) =
buildRequest(MetadataRequest.Builder.allTopics, requestListener)
+ createKafkaApis().handleTopicMetadataRequest(requestChannelRequest)
+
+ readResponse(ApiKeys.METADATA, metadataRequest,
capturedResponse).asInstanceOf[MetadataResponse]
+ }
+
private def testConsumerListOffsetLatest(isolationLevel: IsolationLevel):
Unit = {
val tp = new TopicPartition("foo", 0)
val latestOffset = 15L
@@ -400,14 +455,16 @@ class KafkaApisTest {
buildRequest(requestBuilder)
}
- private def buildRequest[T <: AbstractRequest](builder:
AbstractRequest.Builder[T]): (T, RequestChannel.Request) = {
+ private def buildRequest[T <: AbstractRequest](builder:
AbstractRequest.Builder[T],
+ listenerName: ListenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T,
RequestChannel.Request) = {
+
val request = builder.build()
val buffer = request.serialize(new RequestHeader(builder.apiKey,
request.version, "", 0))
// read the header from the buffer first so that the body can be read next
from the Request constructor
val header = RequestHeader.parse(buffer)
val context = new RequestContext(header, "1", InetAddress.getLocalHost,
KafkaPrincipal.ANONYMOUS,
- new ListenerName(""), SecurityProtocol.PLAINTEXT)
+ listenerName, SecurityProtocol.PLAINTEXT)
(request, new RequestChannel.Request(processor = 1, context = context,
startTimeNanos = 0,
MemoryPool.NONE, buffer, requestChannelMetrics))
}
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 9cbc260..6768b84 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -134,7 +134,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost",
TestUtils.boundPort(s), listenerName,
securityProtocol))
- val nodes = brokers.map(_.getNode(listenerName))
+ val nodes = brokers.map(_.node(listenerName))
val controllerContext = new ControllerContext
controllerContext.liveBrokers = brokers.toSet
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 7781801..dc6ff9e 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -214,7 +214,7 @@ class LeaderEpochIntegrationTest extends
ZooKeeperTestHarness with Logging {
}
private def sender(from: KafkaServer, to: KafkaServer): BlockingSend = {
- val endPoint = from.metadataCache.getAliveBrokers.find(_.id ==
to.config.brokerId).get.getBrokerEndPoint(from.config.interBrokerListenerName)
+ val endPoint = from.metadataCache.getAliveBrokers.find(_.id ==
to.config.brokerId).get.brokerEndPoint(from.config.interBrokerListenerName)
new ReplicaFetcherBlockingSend(endPoint, from.config, new Metrics(), new
SystemTime(), 42, "TestFetcher", new LogContext())
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].