This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 4596ca0 KAFKA-8954; Topic existence check is wrongly implemented in
the DeleteOffset API (KIP-496) (#7406)
4596ca0 is described below
commit 4596ca0a8036b46d27a8f449a2068bf89ad00033
Author: David Jacot <[email protected]>
AuthorDate: Wed Oct 9 17:36:49 2019 +0200
KAFKA-8954; Topic existence check is wrongly implemented in the
DeleteOffset API (KIP-496) (#7406)
This patch changes the way topic existence is checked in the DeleteOffset
API. Previously, it was relying on the committed offsets. Now, it relies on the
metadata cache which is better.
Reviewers: Jason Gustafson <[email protected]>
---
.../kafka/coordinator/group/GroupCoordinator.scala | 14 +----
core/src/main/scala/kafka/server/KafkaApis.scala | 32 +++++++----
.../coordinator/group/GroupCoordinatorTest.scala | 40 ++++++-------
.../scala/unit/kafka/server/KafkaApisTest.scala | 65 +++++++++++++++++++++-
4 files changed, 104 insertions(+), 47 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 64884ec..22f15f9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -565,22 +565,14 @@ class GroupCoordinator(val brokerId: Int,
Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR
case Empty =>
- val (knownPartitions, unknownPartitions) =
- partitions.partition(tp => group.offset(tp).nonEmpty)
-
- partitionEligibleForDeletion = knownPartitions
- partitionErrors = unknownPartitions.map(_ ->
Errors.UNKNOWN_TOPIC_OR_PARTITION).toMap
+ partitionEligibleForDeletion = partitions
case PreparingRebalance | CompletingRebalance | Stable if
group.isConsumerGroup =>
- val (knownPartitions, unknownPartitions) =
- partitions.partition(tp => group.offset(tp).nonEmpty)
-
val (consumed, notConsumed) =
- knownPartitions.partition(tp =>
group.isSubscribedToTopic(tp.topic()))
+ partitions.partition(tp =>
group.isSubscribedToTopic(tp.topic()))
partitionEligibleForDeletion = notConsumed
- partitionErrors = consumed.map(_ ->
Errors.GROUP_SUBSCRIBED_TO_TOPIC).toMap ++
- unknownPartitions.map(_ ->
Errors.UNKNOWN_TOPIC_OR_PARTITION).toMap
+ partitionErrors = consumed.map(_ ->
Errors.GROUP_SUBSCRIBED_TO_TOPIC).toMap
case _ =>
groupError = Errors.NON_EMPTY_GROUP
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c7747b6..9a0ebd7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2688,27 +2688,35 @@ class KafkaApis(val requestChannel: RequestChannel,
val groupId = offsetDeleteRequest.data.groupId
if (authorize(request, DELETE, GROUP, groupId)) {
- val topicPartitions = offsetDeleteRequest.data.topics.asScala.flatMap {
topic =>
- topic.partitions.asScala.map { partition =>
- new TopicPartition(topic.name, partition.partitionIndex)
+ val authorizedTopics = filterAuthorized(request, READ, TOPIC,
+ offsetDeleteRequest.data.topics.asScala.map(_.name).toSeq)
+
+ val topicPartitionErrors = mutable.Map[TopicPartition, Errors]()
+ val topicPartitions = mutable.ArrayBuffer[TopicPartition]()
+
+ for (topic <- offsetDeleteRequest.data.topics.asScala) {
+ for (partition <- topic.partitions.asScala) {
+ val tp = new TopicPartition(topic.name, partition.partitionIndex)
+ if (!authorizedTopics.contains(topic.name))
+ topicPartitionErrors(tp) = Errors.TOPIC_AUTHORIZATION_FAILED
+ else if (!metadataCache.contains(tp))
+ topicPartitionErrors(tp) = Errors.UNKNOWN_TOPIC_OR_PARTITION
+ else
+ topicPartitions += tp
}
- }.toSeq
-
- val authorizedTopics = filterAuthorized(request, READ, TOPIC,
topicPartitions.map(_.topic))
- val (authorizedTopicPartitions, unauthorizedTopicPartitions) =
topicPartitions.partition { topicPartition =>
- authorizedTopics.contains(topicPartition.topic)
}
- val unauthorizedTopicPartitionsErrors =
unauthorizedTopicPartitions.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED)
- val (groupError, authorizedTopicPartitionsErrors) =
groupCoordinator.handleDeleteOffsets(groupId, authorizedTopicPartitions)
- val topicPartitionsErrors = unauthorizedTopicPartitionsErrors ++
authorizedTopicPartitionsErrors
+ val (groupError, authorizedTopicPartitionsErrors) =
groupCoordinator.handleDeleteOffsets(
+ groupId, topicPartitions)
+
+ topicPartitionErrors ++= authorizedTopicPartitionsErrors
sendResponseMaybeThrottle(request, requestThrottleMs => {
if (groupError != Errors.NONE)
offsetDeleteRequest.getErrorResponse(requestThrottleMs, groupError)
else {
val topics = new
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
- topicPartitionsErrors.groupBy(_._1.topic).map { case (topic,
topicPartitions) =>
+ topicPartitionErrors.groupBy(_._1.topic).map { case (topic,
topicPartitions) =>
val partitions = new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
topicPartitions.map { case (topicPartition, error) =>
partitions.add(
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 91a963d..cd85e0a 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -2827,7 +2827,7 @@ class GroupCoordinatorTest {
val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(tp))
assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError)
- assert(topics.isEmpty)
+ assertTrue(topics.isEmpty)
}
@Test
@@ -2838,7 +2838,7 @@ class GroupCoordinatorTest {
val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(tp))
assertEquals(Errors.NON_EMPTY_GROUP, groupError)
- assert(topics.isEmpty)
+ assertTrue(topics.isEmpty)
}
@Test
@@ -2868,7 +2868,7 @@ class GroupCoordinatorTest {
val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId)
verifyLeaveGroupResult(leaveGroupResults)
- assert(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Empty)))
+
assertTrue(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Empty)))
val groupTopicPartition = new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition: Partition = EasyMock.niceMock(classOf[Partition])
@@ -2882,7 +2882,7 @@ class GroupCoordinatorTest {
val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(t1p0))
assertEquals(Errors.NONE, groupError)
- assert(topics.size == 1)
+ assertEquals(1, topics.size)
assertEquals(Some(Errors.NONE), topics.get(t1p0))
val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId,
Some(Seq(t1p0, t2p0)))
@@ -2900,21 +2900,19 @@ class GroupCoordinatorTest {
val syncGroupResult = syncGroupLeader(groupId,
joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
assertEquals(Errors.NONE, syncGroupResult._2)
- val t1p0 = new TopicPartition("foo", 0)
- val t2p0 = new TopicPartition("bar", 0)
+ val tp = new TopicPartition("foo", 0)
val offset = offsetAndMetadata(37)
EasyMock.reset(replicaManager)
val validOffsetCommitResult = commitOffsets(groupId,
joinGroupResult.memberId, joinGroupResult.generationId,
- Map(t1p0 -> offset))
- assertEquals(Errors.NONE, validOffsetCommitResult(t1p0))
+ Map(tp -> offset))
+ assertEquals(Errors.NONE, validOffsetCommitResult(tp))
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(t1p0, t2p0))
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(tp))
assertEquals(Errors.NONE, groupError)
- assert(topics.size == 2)
- assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), topics.get(t1p0))
- assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), topics.get(t2p0))
+ assertEquals(1, topics.size)
+ assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), topics.get(tp))
}
@Test
@@ -2927,7 +2925,7 @@ class GroupCoordinatorTest {
val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(tp))
assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError)
- assert(topics.size == 0)
+ assertTrue(topics.isEmpty)
}
@Test
@@ -2943,7 +2941,6 @@ class GroupCoordinatorTest {
val t1p0 = new TopicPartition("foo", 0)
val t2p0 = new TopicPartition("bar", 0)
- val t3p0 = new TopicPartition("unknown", 0)
val offset = offsetAndMetadata(37)
EasyMock.reset(replicaManager)
@@ -2957,7 +2954,7 @@ class GroupCoordinatorTest {
val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId)
verifyLeaveGroupResult(leaveGroupResults)
- assert(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Empty)))
+
assertTrue(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Empty)))
val groupTopicPartition = new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition: Partition = EasyMock.niceMock(classOf[Partition])
@@ -2968,12 +2965,11 @@ class GroupCoordinatorTest {
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(t1p0, t3p0))
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(t1p0))
assertEquals(Errors.NONE, groupError)
- assert(topics.size == 2)
+ assertEquals(1, topics.size)
assertEquals(Some(Errors.NONE), topics.get(t1p0))
- assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), topics.get(t3p0))
val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId,
Some(Seq(t1p0, t2p0)))
@@ -2997,7 +2993,6 @@ class GroupCoordinatorTest {
val t1p0 = new TopicPartition("foo", 0)
val t2p0 = new TopicPartition("bar", 0)
- val t3p0 = new TopicPartition("unknown", 0)
val offset = offsetAndMetadata(37)
EasyMock.reset(replicaManager)
@@ -3006,7 +3001,7 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, validOffsetCommitResult(t1p0))
assertEquals(Errors.NONE, validOffsetCommitResult(t2p0))
-
assert(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Stable)))
+
assertTrue(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Stable)))
val groupTopicPartition = new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition: Partition = EasyMock.niceMock(classOf[Partition])
@@ -3017,13 +3012,12 @@ class GroupCoordinatorTest {
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(t1p0, t2p0, t3p0))
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(t1p0, t2p0))
assertEquals(Errors.NONE, groupError)
- assert(topics.size == 3)
+ assertEquals(2, topics.size)
assertEquals(Some(Errors.NONE), topics.get(t1p0))
assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), topics.get(t2p0))
- assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), topics.get(t3p0))
val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId,
Some(Seq(t1p0, t2p0)))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index bf1a8c4..66bed52 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -46,9 +46,10 @@ import org.apache.kafka.common.requests.{FetchMetadata =>
JFetchMetadata, _}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.easymock.{Capture, EasyMock, IAnswer}
import EasyMock._
-import org.apache.kafka.common.message.{HeartbeatRequestData,
JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData,
SyncGroupRequestData, TxnOffsetCommitRequestData}
+import org.apache.kafka.common.message.{HeartbeatRequestData,
JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData,
OffsetDeleteRequestData, SyncGroupRequestData, TxnOffsetCommitRequestData}
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
+import
org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition,
OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
import
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.server.authorizer.Authorizer
@@ -393,6 +394,68 @@ class KafkaApisTest {
testListOffsetFailedGetLeaderReplica(Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
+ @Test
+ def testOffsetDeleteWithInvalidPartition(): Unit = {
+ val group = "groupId"
+ val topic = "topic"
+ setupBasicMetadataCache(topic, numPartitions = 1)
+
+ def checkInvalidPartition(invalidPartitionId: Int): Unit = {
+ EasyMock.reset(groupCoordinator, replicaManager,
clientRequestQuotaManager, requestChannel)
+
+ val topics = new OffsetDeleteRequestTopicCollection()
+ topics.add(new OffsetDeleteRequestTopic()
+ .setName(topic)
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestPartition().setPartitionIndex(invalidPartitionId))))
+ val (offsetDeleteRequest, request) = buildRequest(new
OffsetDeleteRequest.Builder(
+ new OffsetDeleteRequestData()
+ .setGroupId(group)
+ .setTopics(topics)
+ ))
+
+ val capturedResponse = expectNoThrottling()
+ EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group),
EasyMock.eq(Seq.empty)))
+ .andReturn((Errors.NONE, Map.empty))
+ EasyMock.replay(groupCoordinator, replicaManager,
clientRequestQuotaManager, requestChannel)
+
+ createKafkaApis().handleOffsetDeleteRequest(request)
+
+ val response = readResponse(ApiKeys.OFFSET_DELETE, offsetDeleteRequest,
capturedResponse)
+ .asInstanceOf[OffsetDeleteResponse]
+
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+
Errors.forCode(response.data.topics.find(topic).partitions.find(invalidPartitionId).errorCode()))
+ }
+
+ checkInvalidPartition(-1)
+ checkInvalidPartition(1) // topic has only one partition
+ }
+
+ @Test
+ def testOffsetDeleteWithInvalidGroup(): Unit = {
+ val group = "groupId"
+
+ EasyMock.reset(groupCoordinator, replicaManager,
clientRequestQuotaManager, requestChannel)
+
+ val (offsetDeleteRequest, request) = buildRequest(new
OffsetDeleteRequest.Builder(
+ new OffsetDeleteRequestData()
+ .setGroupId(group)
+ ))
+
+ val capturedResponse = expectNoThrottling()
+ EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group),
EasyMock.eq(Seq.empty)))
+ .andReturn((Errors.GROUP_ID_NOT_FOUND, Map.empty))
+ EasyMock.replay(groupCoordinator, replicaManager,
clientRequestQuotaManager, requestChannel)
+
+ createKafkaApis().handleOffsetDeleteRequest(request)
+
+ val response = readResponse(ApiKeys.OFFSET_DELETE, offsetDeleteRequest,
capturedResponse)
+ .asInstanceOf[OffsetDeleteResponse]
+
+ assertEquals(Errors.GROUP_ID_NOT_FOUND,
Errors.forCode(response.data.errorCode()))
+ }
+
private def testListOffsetFailedGetLeaderReplica(error: Errors): Unit = {
val tp = new TopicPartition("foo", 0)
val isolationLevel = IsolationLevel.READ_UNCOMMITTED