This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 49226721c0 KAFKA-13861; Fix the validateOnly behavior for
CreatePartitions requests in KRaft mode (#12106)
49226721c0 is described below
commit 49226721c0dc5e5b327e0754e01c367990b43758
Author: Akhilesh Chaganti <[email protected]>
AuthorDate: Wed May 4 10:31:46 2022 -0700
KAFKA-13861; Fix the validateOnly behavior for CreatePartitions requests in
KRaft mode (#12106)
The KRaft implementation of the `CreatePartitions` ignores the
`validateOnly` flag in the
request and creates the partitions if the validations are successful. Fixed
the behavior
not to create partitions upon validation if the `validateOnly` flag is true.
Reviewers: Divij Vaidya <[email protected]>, dengziming
<[email protected]>, Jason Gustafson <[email protected]>
---
.../main/scala/kafka/server/ControllerApis.scala | 2 +-
core/src/test/java/kafka/test/MockController.java | 7 +-
.../kafka/api/BaseAdminIntegrationTest.scala | 2 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 122 ++++++++++++++++-----
.../src/test/scala/kafka/utils/TestInfoUtils.scala | 2 +
.../unit/kafka/server/ControllerApisTest.scala | 32 ++++--
.../org/apache/kafka/controller/Controller.java | 10 +-
.../apache/kafka/controller/QuorumController.java | 20 +++-
.../controller/ReplicationControlManager.java | 2 +-
.../kafka/controller/QuorumControllerTest.java | 4 +-
10 files changed, 155 insertions(+), 48 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index c31b205530..22c37a09b6 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -742,7 +742,7 @@ class ControllerApis(val requestChannel: RequestChannel,
setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
}
}
- controller.createPartitions(deadlineNs, topics).thenApply { results =>
+ controller.createPartitions(deadlineNs, topics,
request.validateOnly).thenApply { results =>
results.forEach(response => responses.add(response))
responses
}
diff --git a/core/src/test/java/kafka/test/MockController.java
b/core/src/test/java/kafka/test/MockController.java
index acb6f90e7f..f18b2cd67c 100644
--- a/core/src/test/java/kafka/test/MockController.java
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -332,8 +332,11 @@ public class MockController implements Controller {
}
@Override
- synchronized public CompletableFuture<List<CreatePartitionsTopicResult>>
- createPartitions(long deadlineNs, List<CreatePartitionsTopic>
topicList) {
+ synchronized public CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(
+ long deadlineNs,
+ List<CreatePartitionsTopic> topicList,
+ boolean validateOnly
+ ) {
if (!active) {
CompletableFuture<List<CreatePartitionsTopicResult>> future = new
CompletableFuture<>();
future.completeExceptionally(NOT_CONTROLLER_EXCEPTION);
diff --git
a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index e3a7911496..08196f65e5 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -53,7 +53,7 @@ abstract class BaseAdminIntegrationTest extends
IntegrationTestHarness with Logg
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
- waitUntilBrokerMetadataIsPropagated(servers)
+ waitUntilBrokerMetadataIsPropagated(brokers)
}
@AfterEach
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index fc14de187c..7d27a57ed8 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -30,7 +30,7 @@ import kafka.log.LogConfig
import kafka.security.authorizer.AclEntry
import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
import kafka.utils.TestUtils._
-import kafka.utils.{Log4jController, TestUtils}
+import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.HostResolver
import org.apache.kafka.clients.admin._
@@ -45,6 +45,8 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType,
TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica,
Uuid}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.slf4j.LoggerFactory
import scala.annotation.nowarn
@@ -74,7 +76,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
brokerLoggerConfigResource = new ConfigResource(
- ConfigResource.Type.BROKER_LOGGER, servers.head.config.brokerId.toString)
+ ConfigResource.Type.BROKER_LOGGER, brokers.head.config.brokerId.toString)
}
@AfterEach
@@ -421,8 +423,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
checkValidAlterConfigs(client, topicResource1, topicResource2)
}
- @Test
- def testCreatePartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreatePartitions(quorum: String): Unit = {
client = Admin.create(createConfig)
// Create topics
@@ -486,7 +489,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
var e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidPartitionsException when newCount is a
decrease")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
- assertEquals("Topic currently has 3 partitions, which is higher than the
requested 1.", e.getCause.getMessage, desc)
+ var exceptionMsgStr = if (isKRaftTest()) {
+ "The topic create-partitions-topic-1 currently has 3 partition(s); 1
would not be an increase."
+ } else {
+ "Topic currently has 3 partitions, which is higher than the requested
1."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try a newCount which would be a noop (without assignment)
@@ -495,7 +503,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic2).get,
() => s"$desc: Expect InvalidPartitionsException when requesting a
noop")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
- assertEquals("Topic already has 3 partitions.", e.getCause.getMessage,
desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "Topic already has 3 partition(s)."
+ } else {
+ "Topic already has 3 partitions."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic2, Some(3)), desc)
// try a newCount which would be a noop (where the assignment matches
current state)
@@ -503,7 +516,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, option)
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic2).get)
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
- assertEquals("Topic already has 3 partitions.", e.getCause.getMessage,
desc)
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic2, Some(3)), desc)
// try a newCount which would be a noop (where the assignment doesn't
match current state)
@@ -511,7 +524,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
NewPartitions.increaseTo(3,
newPartition2Assignments.asScala.reverse.toList.asJava)).asJava, option)
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic2).get)
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
- assertEquals("Topic already has 3 partitions.", e.getCause.getMessage,
desc)
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic2, Some(3)), desc)
// try a bad topic name
@@ -521,7 +534,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(unknownTopic).get,
() => s"$desc: Expect InvalidTopicException when using an unknown
topic")
assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException],
desc)
- assertEquals("The topic 'an-unknown-topic' does not exist.",
e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "This server does not host this topic-partition."
+ } else {
+ "The topic 'an-unknown-topic' does not exist."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
// try an invalid newCount
alterResult = client.createPartitions(Map(topic1 ->
@@ -529,7 +547,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidPartitionsException when newCount is
invalid")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
- assertEquals("Topic currently has 3 partitions, which is higher than the
requested -22.", e.getCause.getMessage,
+ exceptionMsgStr = if (isKRaftTest()) {
+ "The topic create-partitions-topic-1 currently has 3 partition(s); -22
would not be an increase."
+ } else {
+ "Topic currently has 3 partitions, which is higher than the requested
-22."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage,
desc)
assertEquals(3, numPartitions(topic1), desc)
@@ -539,9 +562,14 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidPartitionsException when #brokers !=
replication factor")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException],
desc)
- assertEquals("Inconsistent replication factor between partitions,
partition 0 has 1 " +
- "while partitions [3] have replication factors [2], respectively.",
- e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "The manual partition assignment includes a partition with 2
replica(s), but this is not " +
+ "consistent with previous partitions, which have 1 replica(s)."
+ } else {
+ "Inconsistent replication factor between partitions, partition 0 has 1
while partitions [3] " +
+ "have replication factors [2], respectively."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try #assignments < with the increase
@@ -550,7 +578,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when
#assignments != newCount - oldCount")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException],
desc)
- assertEquals("Increasing the number of partitions by 3 but 1 assignments
provided.", e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "Attempted to add 3 additional partition(s), but only 1 assignment(s)
were specified."
+ } else {
+ "Increasing the number of partitions by 3 but 1 assignments provided."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try #assignments > with the increase
@@ -558,8 +591,13 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava,
option)
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when
#assignments != newCount - oldCount")
+ exceptionMsgStr = if (isKRaftTest()) {
+ "Attempted to add 1 additional partition(s), but only 2 assignment(s)
were specified."
+ } else {
+ "Increasing the number of partitions by 1 but 2 assignments provided."
+ }
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException],
desc)
- assertEquals("Increasing the number of partitions by 1 but 2 assignments
provided.", e.getCause.getMessage, desc)
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try with duplicate brokers in assignments
@@ -568,8 +606,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when
assignments has duplicate brokers")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException],
desc)
- assertEquals("Duplicate brokers not allowed in replica assignment: 1, 1
for partition id 3.",
- e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "The manual partition assignment includes the broker 1 more than once."
+ } else {
+ "Duplicate brokers not allowed in replica assignment: 1, 1 for
partition id 3."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try assignments with differently sized inner lists
@@ -578,8 +620,14 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when
assignments have differently sized inner lists")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException],
desc)
- assertEquals("Inconsistent replication factor between partitions,
partition 0 has 1 " +
- "while partitions [4] have replication factors [2], respectively.",
e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "The manual partition assignment includes a partition with 2
replica(s), but this is not " +
+ "consistent with previous partitions, which have 1 replica(s)."
+ } else {
+ "Inconsistent replication factor between partitions, partition 0 has 1
" +
+ "while partitions [4] have replication factors [2], respectively."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try assignments with unknown brokers
@@ -588,7 +636,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when
assignments contains an unknown broker")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException],
desc)
- assertEquals("Unknown broker(s) in replica assignment: 12.",
e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "The manual partition assignment includes broker 12, but no such
broker is registered."
+ } else {
+ "Unknown broker(s) in replica assignment: 12."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try with empty assignments
@@ -597,7 +650,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when
assignments is empty")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException],
desc)
- assertEquals("Increasing the number of partitions by 1 but 0 assignments
provided.", e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "Attempted to add 1 additional partition(s), but only 0 assignment(s)
were specified."
+ } else {
+ "Increasing the number of partitions by 1 but 0 assignments provided."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
}
@@ -610,18 +668,30 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
TestUtils.waitUntilTrue(() => numPartitions(topic1) == 4, "Timed out
waiting for new partitions to appear")
var e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic2).get)
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
- assertEquals("Topic currently has 3 partitions, which is higher than the
requested 2.", e.getCause.getMessage)
+ val exceptionMsgStr = if (isKRaftTest()) {
+ "The topic create-partitions-topic-2 currently has 3 partition(s); 2
would not be an increase."
+ } else {
+ "Topic currently has 3 partitions, which is higher than the requested 2."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage)
assertEquals(3, numPartitions(topic2))
- // finally, try to add partitions to a topic queued for deletion
+ // Delete the topic. Verify addition of partitions to deleted topic is not
possible. In
+ // Zookeeper mode, the topic is queued for deletion. In KRaft, the
deletion occurs
+ // immediately and hence we have a different Exception thrown in the
response.
val deleteResult = client.deleteTopics(asList(topic1))
deleteResult.topicNameValues.get(topic1).get
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(4)).asJava, validateOnly)
e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic1).get,
- () => "Expect InvalidTopicException when the topic is queued for
deletion")
- assertTrue(e.getCause.isInstanceOf[InvalidTopicException])
- assertEquals("The topic is queued for deletion.", e.getCause.getMessage)
+ () => "Expect InvalidTopicException or UnknownTopicOrPartitionException
when the topic is queued for deletion")
+ if (isKRaftTest()) {
+ assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException],
e.toString)
+ assertEquals("This server does not host this topic-partition.",
e.getCause.getMessage)
+ } else {
+ assertTrue(e.getCause.isInstanceOf[InvalidTopicException], e.toString)
+ assertEquals("The topic is queued for deletion.", e.getCause.getMessage)
+ }
}
@Test
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
index ecd656e0fb..0ba22fdebc 100644
--- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -30,6 +30,8 @@ class EmptyTestInfo extends TestInfo {
}
object TestInfoUtils {
+ final val TestWithParameterizedQuorumName = "{displayName}.quorum={0}"
+
def isKRaft(testInfo: TestInfo): Boolean = {
if (testInfo.getDisplayName().contains("quorum=")) {
if (testInfo.getDisplayName().contains("quorum=kraft")) {
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index c8ce3a1605..906b7a3f58 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -20,9 +20,8 @@ package kafka.server
import java.net.InetAddress
import java.util
import java.util.Collections.singletonList
-import java.util.Properties
+import java.util.{Collections, Properties}
import java.util.concurrent.{CompletableFuture, ExecutionException}
-
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
@@ -58,6 +57,8 @@ import org.apache.kafka.server.authorizer.{Action,
AuthorizableRequestContext, A
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
@@ -706,11 +707,10 @@ class ControllerApisTest {
_ => Set("foo", "bar")))
}
- @Test
- def testCreatePartitionsRequest(): Unit = {
- val controller = new MockController.Builder().
- newInitialTopic("foo", Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")).
- newInitialTopic("bar", Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")).build()
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testCreatePartitionsRequest(validateOnly: Boolean): Unit = {
+ val controller = mock(classOf[Controller])
val controllerApis = createControllerApis(None, controller)
val request = new CreatePartitionsRequestData()
request.topics().add(new
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))
@@ -718,9 +718,23 @@ class ControllerApisTest {
request.topics().add(new
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
request.topics().add(new
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
request.topics().add(new
CreatePartitionsTopic().setName("baz").setAssignments(null).setCount(5))
+ request.setValidateOnly(validateOnly)
+
+ // Check if the controller is called correctly with the 'validateOnly'
field set appropriately.
+ when(controller.createPartitions(
+ any(),
+ ArgumentMatchers.eq(
+ Collections.singletonList(
+ new
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))),
+ ArgumentMatchers.eq(validateOnly))).thenReturn(CompletableFuture
+ .completedFuture(Collections.singletonList(
+ new CreatePartitionsTopicResult().setName("foo").
+ setErrorCode(NONE.code()).
+ setErrorMessage(null)
+ )))
assertEquals(Set(new CreatePartitionsTopicResult().setName("foo").
- setErrorCode(NONE.code()).
- setErrorMessage(null),
+ setErrorCode(NONE.code()).
+ setErrorMessage(null),
new CreatePartitionsTopicResult().setName("bar").
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate topic name."),
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index 3d91f67b34..e7c656885b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -259,10 +259,16 @@ public interface Controller extends AclMutator,
AutoCloseable {
* @param deadlineNs The time by which this operation needs to be
complete, before
* we will complete this operation with a timeout.
* @param topics The list of topics to create partitions for.
+ * @param validateOnly If true, the request is validated, but no
partitions will be created.
+ *
* @return A future yielding per-topic results.
*/
- CompletableFuture<List<CreatePartitionsTopicResult>>
- createPartitions(long deadlineNs, List<CreatePartitionsTopic>
topics);
+ CompletableFuture<List<CreatePartitionsTopicResult>> createPartitions(
+ long deadlineNs,
+ List<CreatePartitionsTopic> topics,
+ boolean validateOnly
+ );
+
/**
* Begin shutting down, but don't block. You must still call close to
clean up all
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 966cfa1dd3..c458df94af 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1563,13 +1563,25 @@ public final class QuorumController implements
Controller {
}
@Override
- public CompletableFuture<List<CreatePartitionsTopicResult>>
- createPartitions(long deadlineNs, List<CreatePartitionsTopic>
topics) {
+ public CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(
+ long deadlineNs,
+ List<CreatePartitionsTopic> topics,
+ boolean validateOnly
+ ) {
if (topics.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
- return appendWriteEvent("createPartitions", deadlineNs,
- () -> replicationControl.createPartitions(topics));
+
+ return appendWriteEvent("createPartitions", deadlineNs, () -> {
+ final ControllerResult<List<CreatePartitionsTopicResult>> result =
replicationControl.createPartitions(topics);
+ if (validateOnly) {
+ log.debug("Validate-only CreatePartitions result(s): {}",
result.response());
+ return result.withoutRecords();
+ } else {
+ log.debug("CreatePartitions result(s): {}", result.response());
+ return result;
+ }
+ });
}
@Override
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index edf627eb51..a82f2851cb 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1178,7 +1178,7 @@ public class ReplicationControlManager {
setErrorCode(apiError.error().code()).
setErrorMessage(apiError.message()));
}
- return new ControllerResult<>(records, results, true);
+ return ControllerResult.atomicOf(records, results);
}
void createPartitions(CreatePartitionsTopic topic,
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 963c99eeff..08933d0e8e 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -782,7 +782,7 @@ public class QuorumControllerTest {
controller.findTopicNames(now,
Collections.singletonList(Uuid.ZERO_UUID));
CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitionsFuture =
controller.createPartitions(now, Collections.singletonList(
- new CreatePartitionsTopic()));
+ new CreatePartitionsTopic()), false);
CompletableFuture<ElectLeadersResponseData> electLeadersFuture
=
controller.electLeaders(new
ElectLeadersRequestData().setTimeoutMs(0).
setTopicPartitions(null));
@@ -836,7 +836,7 @@ public class QuorumControllerTest {
CompletableFuture<Map<Uuid, ResultOrError<String>>>
findTopicNamesFuture =
controller.findTopicNames(deadlineMs,
Collections.emptyList());
CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitionsFuture =
- controller.createPartitions(deadlineMs,
Collections.emptyList());
+ controller.createPartitions(deadlineMs,
Collections.emptyList(), false);
CompletableFuture<ElectLeadersResponseData> electLeadersFuture
=
controller.electLeaders(new
ElectLeadersRequestData().setTimeoutMs(120000));
CompletableFuture<AlterPartitionReassignmentsResponseData>
alterReassignmentsFuture =