This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 49226721c0 KAFKA-13861; Fix the validateOnly behavior for 
CreatePartitions requests in KRaft mode (#12106)
49226721c0 is described below

commit 49226721c0dc5e5b327e0754e01c367990b43758
Author: Akhilesh Chaganti <[email protected]>
AuthorDate: Wed May 4 10:31:46 2022 -0700

    KAFKA-13861; Fix the validateOnly behavior for CreatePartitions requests in 
KRaft mode (#12106)
    
    The KRaft implementation of the `CreatePartitions` ignores the 
`validateOnly` flag in the
    request and creates the partitions if the validations are successful. Fixed 
the behavior
    not to create partitions upon validation if the `validateOnly` flag is true.
    
    Reviewers: Divij Vaidya <[email protected]>, dengziming 
<[email protected]>, Jason Gustafson <[email protected]>
---
 .../main/scala/kafka/server/ControllerApis.scala   |   2 +-
 core/src/test/java/kafka/test/MockController.java  |   7 +-
 .../kafka/api/BaseAdminIntegrationTest.scala       |   2 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 122 ++++++++++++++++-----
 .../src/test/scala/kafka/utils/TestInfoUtils.scala |   2 +
 .../unit/kafka/server/ControllerApisTest.scala     |  32 ++++--
 .../org/apache/kafka/controller/Controller.java    |  10 +-
 .../apache/kafka/controller/QuorumController.java  |  20 +++-
 .../controller/ReplicationControlManager.java      |   2 +-
 .../kafka/controller/QuorumControllerTest.java     |   4 +-
 10 files changed, 155 insertions(+), 48 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index c31b205530..22c37a09b6 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -742,7 +742,7 @@ class ControllerApis(val requestChannel: RequestChannel,
           setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
       }
     }
-    controller.createPartitions(deadlineNs, topics).thenApply { results =>
+    controller.createPartitions(deadlineNs, topics, 
request.validateOnly).thenApply { results =>
       results.forEach(response => responses.add(response))
       responses
     }
diff --git a/core/src/test/java/kafka/test/MockController.java 
b/core/src/test/java/kafka/test/MockController.java
index acb6f90e7f..f18b2cd67c 100644
--- a/core/src/test/java/kafka/test/MockController.java
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -332,8 +332,11 @@ public class MockController implements Controller {
     }
 
     @Override
-    synchronized public CompletableFuture<List<CreatePartitionsTopicResult>>
-            createPartitions(long deadlineNs, List<CreatePartitionsTopic> 
topicList) {
+    synchronized public CompletableFuture<List<CreatePartitionsTopicResult>> 
createPartitions(
+        long deadlineNs,
+        List<CreatePartitionsTopic> topicList,
+        boolean validateOnly
+    ) {
         if (!active) {
             CompletableFuture<List<CreatePartitionsTopicResult>> future = new 
CompletableFuture<>();
             future.completeExceptionally(NOT_CONTROLLER_EXCEPTION);
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index e3a7911496..08196f65e5 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -53,7 +53,7 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    waitUntilBrokerMetadataIsPropagated(servers)
+    waitUntilBrokerMetadataIsPropagated(brokers)
   }
 
   @AfterEach
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index fc14de187c..7d27a57ed8 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -30,7 +30,7 @@ import kafka.log.LogConfig
 import kafka.security.authorizer.AclEntry
 import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
-import kafka.utils.{Log4jController, TestUtils}
+import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.HostResolver
 import org.apache.kafka.clients.admin._
@@ -45,6 +45,8 @@ import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{ConsumerGroupState, ElectionType, 
TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, 
Uuid}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import org.slf4j.LoggerFactory
 
 import scala.annotation.nowarn
@@ -74,7 +76,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
     brokerLoggerConfigResource = new ConfigResource(
-      ConfigResource.Type.BROKER_LOGGER, servers.head.config.brokerId.toString)
+      ConfigResource.Type.BROKER_LOGGER, brokers.head.config.brokerId.toString)
   }
 
   @AfterEach
@@ -421,8 +423,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     checkValidAlterConfigs(client, topicResource1, topicResource2)
   }
 
-  @Test
-  def testCreatePartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreatePartitions(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     // Create topics
@@ -486,7 +489,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       var e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidPartitionsException when newCount is a 
decrease")
       assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
-      assertEquals("Topic currently has 3 partitions, which is higher than the 
requested 1.", e.getCause.getMessage, desc)
+      var exceptionMsgStr = if (isKRaftTest()) {
+        "The topic create-partitions-topic-1 currently has 3 partition(s); 1 
would not be an increase."
+      } else {
+        "Topic currently has 3 partitions, which is higher than the requested 
1."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try a newCount which would be a noop (without assignment)
@@ -495,7 +503,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic2).get,
         () => s"$desc: Expect InvalidPartitionsException when requesting a 
noop")
       assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
-      assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, 
desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "Topic already has 3 partition(s)."
+      } else {
+        "Topic already has 3 partitions."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic2, Some(3)), desc)
 
       // try a newCount which would be a noop (where the assignment matches 
current state)
@@ -503,7 +516,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, option)
       e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic2).get)
       assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
-      assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, 
desc)
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic2, Some(3)), desc)
 
       // try a newCount which would be a noop (where the assignment doesn't 
match current state)
@@ -511,7 +524,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         NewPartitions.increaseTo(3, 
newPartition2Assignments.asScala.reverse.toList.asJava)).asJava, option)
       e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic2).get)
       assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
-      assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, 
desc)
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic2, Some(3)), desc)
 
       // try a bad topic name
@@ -521,7 +534,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(unknownTopic).get,
         () => s"$desc: Expect InvalidTopicException when using an unknown 
topic")
       assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], 
desc)
-      assertEquals("The topic 'an-unknown-topic' does not exist.", 
e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "This server does not host this topic-partition."
+      } else {
+        "The topic 'an-unknown-topic' does not exist."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
 
       // try an invalid newCount
       alterResult = client.createPartitions(Map(topic1 ->
@@ -529,7 +547,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidPartitionsException when newCount is 
invalid")
       assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
-      assertEquals("Topic currently has 3 partitions, which is higher than the 
requested -22.", e.getCause.getMessage,
+      exceptionMsgStr = if (isKRaftTest()) {
+        "The topic create-partitions-topic-1 currently has 3 partition(s); -22 
would not be an increase."
+      } else {
+        "Topic currently has 3 partitions, which is higher than the requested 
-22."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage,
         desc)
       assertEquals(3, numPartitions(topic1), desc)
 
@@ -539,9 +562,14 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidPartitionsException when #brokers != 
replication factor")
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], 
desc)
-      assertEquals("Inconsistent replication factor between partitions, 
partition 0 has 1 " +
-        "while partitions [3] have replication factors [2], respectively.",
-        e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "The manual partition assignment includes a partition with 2 
replica(s), but this is not " +
+          "consistent with previous partitions, which have 1 replica(s)."
+      } else {
+        "Inconsistent replication factor between partitions, partition 0 has 1 
while partitions [3] " +
+          "have replication factors [2], respectively."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try #assignments < with the increase
@@ -550,7 +578,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidReplicaAssignmentException when 
#assignments != newCount - oldCount")
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], 
desc)
-      assertEquals("Increasing the number of partitions by 3 but 1 assignments 
provided.", e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "Attempted to add 3 additional partition(s), but only 1 assignment(s) 
were specified."
+      } else {
+        "Increasing the number of partitions by 3 but 1 assignments provided."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try #assignments > with the increase
@@ -558,8 +591,13 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, 
option)
       e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidReplicaAssignmentException when 
#assignments != newCount - oldCount")
+      exceptionMsgStr = if (isKRaftTest()) {
+        "Attempted to add 1 additional partition(s), but only 2 assignment(s) 
were specified."
+      } else {
+        "Increasing the number of partitions by 1 but 2 assignments provided."
+      }
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], 
desc)
-      assertEquals("Increasing the number of partitions by 1 but 2 assignments 
provided.", e.getCause.getMessage, desc)
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try with duplicate brokers in assignments
@@ -568,8 +606,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidReplicaAssignmentException when 
assignments has duplicate brokers")
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], 
desc)
-      assertEquals("Duplicate brokers not allowed in replica assignment: 1, 1 
for partition id 3.",
-        e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "The manual partition assignment includes the broker 1 more than once."
+      } else {
+        "Duplicate brokers not allowed in replica assignment: 1, 1 for 
partition id 3."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try assignments with differently sized inner lists
@@ -578,8 +620,14 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidReplicaAssignmentException when 
assignments have differently sized inner lists")
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], 
desc)
-      assertEquals("Inconsistent replication factor between partitions, 
partition 0 has 1 " +
-        "while partitions [4] have replication factors [2], respectively.", 
e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "The manual partition assignment includes a partition with 2 
replica(s), but this is not " +
+          "consistent with previous partitions, which have 1 replica(s)."
+      } else {
+        "Inconsistent replication factor between partitions, partition 0 has 1 
" +
+          "while partitions [4] have replication factors [2], respectively."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try assignments with unknown brokers
@@ -588,7 +636,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidReplicaAssignmentException when 
assignments contains an unknown broker")
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], 
desc)
-      assertEquals("Unknown broker(s) in replica assignment: 12.", 
e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "The manual partition assignment includes broker 12, but no such 
broker is registered."
+      } else {
+        "Unknown broker(s) in replica assignment: 12."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try with empty assignments
@@ -597,7 +650,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidReplicaAssignmentException when 
assignments is empty")
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], 
desc)
-      assertEquals("Increasing the number of partitions by 1 but 0 assignments 
provided.", e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "Attempted to add 1 additional partition(s), but only 0 assignment(s) 
were specified."
+      } else {
+        "Increasing the number of partitions by 1 but 0 assignments provided."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
     }
 
@@ -610,18 +668,30 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     TestUtils.waitUntilTrue(() => numPartitions(topic1) == 4, "Timed out 
waiting for new partitions to appear")
     var e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic2).get)
     assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
-    assertEquals("Topic currently has 3 partitions, which is higher than the 
requested 2.", e.getCause.getMessage)
+    val exceptionMsgStr = if (isKRaftTest()) {
+      "The topic create-partitions-topic-2 currently has 3 partition(s); 2 
would not be an increase."
+    } else {
+      "Topic currently has 3 partitions, which is higher than the requested 2."
+    }
+    assertEquals(exceptionMsgStr, e.getCause.getMessage)
     assertEquals(3, numPartitions(topic2))
 
-    // finally, try to add partitions to a topic queued for deletion
+    // Delete the topic. Verify addition of partitions to deleted topic is not 
possible. In
+    // Zookeeper mode, the topic is queued for deletion. In KRaft, the 
deletion occurs
+    // immediately and hence we have a different Exception thrown in the 
response.
     val deleteResult = client.deleteTopics(asList(topic1))
     deleteResult.topicNameValues.get(topic1).get
     alterResult = client.createPartitions(Map(topic1 ->
       NewPartitions.increaseTo(4)).asJava, validateOnly)
     e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic1).get,
-      () => "Expect InvalidTopicException when the topic is queued for 
deletion")
-    assertTrue(e.getCause.isInstanceOf[InvalidTopicException])
-    assertEquals("The topic is queued for deletion.", e.getCause.getMessage)
+      () => "Expect InvalidTopicException or UnknownTopicOrPartitionException 
when the topic is queued for deletion")
+    if (isKRaftTest()) {
+      assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], 
e.toString)
+      assertEquals("This server does not host this topic-partition.", 
e.getCause.getMessage)
+    } else {
+      assertTrue(e.getCause.isInstanceOf[InvalidTopicException], e.toString)
+      assertEquals("The topic is queued for deletion.", e.getCause.getMessage)
+    }
   }
 
   @Test
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala 
b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
index ecd656e0fb..0ba22fdebc 100644
--- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -30,6 +30,8 @@ class EmptyTestInfo extends TestInfo {
 }
 
 object TestInfoUtils {
+  final val TestWithParameterizedQuorumName = "{displayName}.quorum={0}"
+
   def isKRaft(testInfo: TestInfo): Boolean = {
     if (testInfo.getDisplayName().contains("quorum=")) {
       if (testInfo.getDisplayName().contains("quorum=kraft")) {
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index c8ce3a1605..906b7a3f58 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -20,9 +20,8 @@ package kafka.server
 import java.net.InetAddress
 import java.util
 import java.util.Collections.singletonList
-import java.util.Properties
+import java.util.{Collections, Properties}
 import java.util.concurrent.{CompletableFuture, ExecutionException}
-
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
@@ -58,6 +57,8 @@ import org.apache.kafka.server.authorizer.{Action, 
AuthorizableRequestContext, A
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito._
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
@@ -706,11 +707,10 @@ class ControllerApisTest {
         _ => Set("foo", "bar")))
   }
 
-  @Test
-  def testCreatePartitionsRequest(): Unit = {
-    val controller = new MockController.Builder().
-      newInitialTopic("foo", Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")).
-      newInitialTopic("bar", Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")).build()
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testCreatePartitionsRequest(validateOnly: Boolean): Unit = {
+    val controller = mock(classOf[Controller])
     val controllerApis = createControllerApis(None, controller)
     val request = new CreatePartitionsRequestData()
     request.topics().add(new 
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))
@@ -718,9 +718,23 @@ class ControllerApisTest {
     request.topics().add(new 
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
     request.topics().add(new 
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
     request.topics().add(new 
CreatePartitionsTopic().setName("baz").setAssignments(null).setCount(5))
+    request.setValidateOnly(validateOnly)
+
+    // Check if the controller is called correctly with the 'validateOnly' 
field set appropriately.
+    when(controller.createPartitions(
+      any(),
+      ArgumentMatchers.eq(
+        Collections.singletonList(
+          new 
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))),
+      ArgumentMatchers.eq(validateOnly))).thenReturn(CompletableFuture
+      .completedFuture(Collections.singletonList(
+        new CreatePartitionsTopicResult().setName("foo").
+          setErrorCode(NONE.code()).
+          setErrorMessage(null)
+      )))
     assertEquals(Set(new CreatePartitionsTopicResult().setName("foo").
-        setErrorCode(NONE.code()).
-        setErrorMessage(null),
+      setErrorCode(NONE.code()).
+      setErrorMessage(null),
       new CreatePartitionsTopicResult().setName("bar").
         setErrorCode(INVALID_REQUEST.code()).
         setErrorMessage("Duplicate topic name."),
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java 
b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index 3d91f67b34..e7c656885b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -259,10 +259,16 @@ public interface Controller extends AclMutator, 
AutoCloseable {
      * @param deadlineNs    The time by which this operation needs to be 
complete, before
      *                      we will complete this operation with a timeout.
      * @param topics        The list of topics to create partitions for.
+     * @param validateOnly  If true, the request is validated, but no 
partitions will be created.
+     *
      * @return              A future yielding per-topic results.
      */
-    CompletableFuture<List<CreatePartitionsTopicResult>>
-            createPartitions(long deadlineNs, List<CreatePartitionsTopic> 
topics);
+    CompletableFuture<List<CreatePartitionsTopicResult>> createPartitions(
+        long deadlineNs,
+        List<CreatePartitionsTopic> topics,
+        boolean validateOnly
+    );
+
 
     /**
      * Begin shutting down, but don't block.  You must still call close to 
clean up all
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 966cfa1dd3..c458df94af 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1563,13 +1563,25 @@ public final class QuorumController implements 
Controller {
     }
 
     @Override
-    public CompletableFuture<List<CreatePartitionsTopicResult>>
-            createPartitions(long deadlineNs, List<CreatePartitionsTopic> 
topics) {
+    public CompletableFuture<List<CreatePartitionsTopicResult>> 
createPartitions(
+        long deadlineNs,
+        List<CreatePartitionsTopic> topics,
+        boolean validateOnly
+    ) {
         if (topics.isEmpty()) {
             return CompletableFuture.completedFuture(Collections.emptyList());
         }
-        return appendWriteEvent("createPartitions", deadlineNs,
-            () -> replicationControl.createPartitions(topics));
+
+        return appendWriteEvent("createPartitions", deadlineNs, () -> {
+            final ControllerResult<List<CreatePartitionsTopicResult>> result = 
replicationControl.createPartitions(topics);
+            if (validateOnly) {
+                log.debug("Validate-only CreatePartitions result(s): {}", 
result.response());
+                return result.withoutRecords();
+            } else {
+                log.debug("CreatePartitions result(s): {}", result.response());
+                return result;
+            }
+        });
     }
 
     @Override
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index edf627eb51..a82f2851cb 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1178,7 +1178,7 @@ public class ReplicationControlManager {
                 setErrorCode(apiError.error().code()).
                 setErrorMessage(apiError.message()));
         }
-        return new ControllerResult<>(records, results, true);
+        return ControllerResult.atomicOf(records, results);
     }
 
     void createPartitions(CreatePartitionsTopic topic,
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 963c99eeff..08933d0e8e 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -782,7 +782,7 @@ public class QuorumControllerTest {
                     controller.findTopicNames(now, 
Collections.singletonList(Uuid.ZERO_UUID));
                 CompletableFuture<List<CreatePartitionsTopicResult>> 
createPartitionsFuture =
                     controller.createPartitions(now, Collections.singletonList(
-                        new CreatePartitionsTopic()));
+                        new CreatePartitionsTopic()), false);
                 CompletableFuture<ElectLeadersResponseData> electLeadersFuture 
=
                     controller.electLeaders(new 
ElectLeadersRequestData().setTimeoutMs(0).
                         setTopicPartitions(null));
@@ -836,7 +836,7 @@ public class QuorumControllerTest {
                 CompletableFuture<Map<Uuid, ResultOrError<String>>> 
findTopicNamesFuture =
                     controller.findTopicNames(deadlineMs, 
Collections.emptyList());
                 CompletableFuture<List<CreatePartitionsTopicResult>> 
createPartitionsFuture =
-                    controller.createPartitions(deadlineMs, 
Collections.emptyList());
+                    controller.createPartitions(deadlineMs, 
Collections.emptyList(), false);
                 CompletableFuture<ElectLeadersResponseData> electLeadersFuture 
=
                     controller.electLeaders(new 
ElectLeadersRequestData().setTimeoutMs(120000));
                 CompletableFuture<AlterPartitionReassignmentsResponseData> 
alterReassignmentsFuture =

Reply via email to