This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f19cd6c KAFKA-10312; Fix error code returned in Metadata response
when leader is not available (#9112)
f19cd6c is described below
commit f19cd6ca48ccd2971948c9589279bb8ee20e5d88
Author: Raman Verma <[email protected]>
AuthorDate: Mon Aug 24 11:18:59 2020 -0700
KAFKA-10312; Fix error code returned in Metadata response when leader is
not available (#9112)
MetadataCache#getPartitionMetadata returns an error when the topic's leader
Id
is present at MetadataCache but listener endpoint is not present for this
leader.
For older versions, LEADER_NOT_AVAILABLE is returned while
LISTENER_NOT_FOUND is
returned for new metadata versions.
The problem is that getPartitionMetadata was looking up MetadataCache's
host brokerId rather
than the topic's leader id while determining what error to return. This
could result in the call returning LISTENER_NOT_FOUND when it should
have returned LEADER_NOT_AVAILABLE. This commit corrects this behavior.
Unit tests were already present to test out the error codes returned
under different situations but they were giving out a false positive.
The test was using same broker id for both the MetadataCache's host as
well as for the topic's leader. Error manifests when the MetadataCache's
host id is changed. Improved the test.
This commit also consolidated couple of related tests to reduce code
duplication.
Reviewers: Jason Gustafson <[email protected]>
---
.../main/scala/kafka/server/MetadataCache.scala | 2 +-
.../unit/kafka/server/MetadataCacheTest.scala | 49 +++++++---------------
2 files changed, 15 insertions(+), 36 deletions(-)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 79d84ea..8460446 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -97,7 +97,7 @@ class MetadataCache(brokerId: Int) extends Logging {
maybeLeader match {
case None =>
- val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we
are already holding the read lock
+ val error = if (!snapshot.aliveBrokers.contains(leaderBrokerId)) {
// we are already holding the read lock
debug(s"Error while fetching metadata for $topicPartition:
leader not available")
Errors.LEADER_NOT_AVAILABLE
} else {
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index d252936..1a62065 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -154,14 +154,18 @@ class MetadataCacheTest {
.setPort(9092)
.setSecurityProtocol(securityProtocol.id)
.setListener(listenerName.value)).asJava))
- verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers,
listenerName,
+ val metadataCacheBrokerId = 0
+ // leader is not available. expect LEADER_NOT_AVAILABLE for any metadata
version.
+
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId,
brokers, listenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners =
false)
- verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers,
listenerName,
+
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId,
brokers, listenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners =
true)
}
@Test
def getTopicMetadataPartitionListenerNotAvailableOnLeader(): Unit = {
+ // when listener name is not present in the metadata cache for a broker,
getTopicMetadata should
+ // return LEADER_NOT_AVAILABLE or LISTENER_NOT_FOUND errors for old and
new versions respectively.
val plaintextListenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val sslListenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
val broker0Endpoints = Seq(
@@ -187,49 +191,24 @@ class MetadataCacheTest {
new UpdateMetadataBroker()
.setId(1)
.setEndpoints(broker1Endpoints.asJava))
- verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers,
sslListenerName,
+ val metadataCacheBrokerId = 0
+ // leader available in cache but listener name not present. expect
LISTENER_NOT_FOUND error for new metadata version
+
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId,
brokers, sslListenerName,
leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true)
- }
-
- @Test
- def
getTopicMetadataPartitionListenerNotAvailableOnLeaderOldMetadataVersion(): Unit
= {
- val plaintextListenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
- val sslListenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
- val broker0Endpoints = Seq(
- new UpdateMetadataEndpoint()
- .setHost("host0")
- .setPort(9092)
- .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
- .setListener(plaintextListenerName.value),
- new UpdateMetadataEndpoint()
- .setHost("host0")
- .setPort(9093)
- .setSecurityProtocol(SecurityProtocol.SSL.id)
- .setListener(sslListenerName.value))
- val broker1Endpoints = Seq(new UpdateMetadataEndpoint()
- .setHost("host1")
- .setPort(9092)
- .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
- .setListener(plaintextListenerName.value))
- val brokers = Seq(
- new UpdateMetadataBroker()
- .setId(0)
- .setEndpoints(broker0Endpoints.asJava),
- new UpdateMetadataBroker()
- .setId(1)
- .setEndpoints(broker1Endpoints.asJava))
- verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers,
sslListenerName,
+ // leader available in cache but listener name not present. expect
LEADER_NOT_AVAILABLE error for old metadata version
+
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId,
brokers, sslListenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners =
false)
}
- private def
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers:
Seq[UpdateMetadataBroker],
+ private def
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId:
Int,
+
brokers: Seq[UpdateMetadataBroker],
listenerName: ListenerName,
leader:
Int,
expectedError: Errors,
errorUnavailableListeners: Boolean): Unit = {
val topic = "topic"
- val cache = new MetadataCache(1)
+ val cache = new MetadataCache(metadataCacheBrokerId)
val zkVersion = 3
val controllerId = 2