chia7712 commented on a change in pull request #10465:
URL: https://github.com/apache/kafka/pull/10465#discussion_r607623340



##########
File path: core/src/test/java/kafka/test/MockController.java
##########
@@ -77,8 +82,28 @@ private MockController(Collection<MockTopic> initialTopics) {
     }
 
     @Override
-    public CompletableFuture<CreateTopicsResponseData> 
createTopics(CreateTopicsRequestData request) {
-        throw new UnsupportedOperationException();
+    synchronized public CompletableFuture<CreateTopicsResponseData>
+            createTopics(CreateTopicsRequestData request) {
+        CreateTopicsResponseData response = new CreateTopicsResponseData();
+        for (CreatableTopic topic : request.topics()) {
+            if (topics.containsKey(topic.name())) {
+                response.topics().add(new CreatableTopicResult().
+                    setName(topic.name()).
+                    setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()));
+            } else {
+                long topicId = nextTopicId.getAndAdd(1);
+                Uuid topicUuid = new Uuid(0, topicId);
+                topicNameToId.put(topic.name(), topicUuid);
+                topics.put(topicUuid, new MockTopic(topic.name(), topicUuid));
+                response.topics().add(new CreatableTopicResult().
+                    setName(topic.name()).
+                    setErrorCode(Errors.NONE.code()).
+                    setTopicId(topicUuid));
+                // For a better mock, we might want to return configs, 
replication

Review comment:
       agreed. why not including such information?

##########
File path: core/src/test/java/kafka/test/MockController.java
##########
@@ -77,8 +82,28 @@ private MockController(Collection<MockTopic> initialTopics) {
     }
 
     @Override
-    public CompletableFuture<CreateTopicsResponseData> 
createTopics(CreateTopicsRequestData request) {
-        throw new UnsupportedOperationException();
+    synchronized public CompletableFuture<CreateTopicsResponseData>
+            createTopics(CreateTopicsRequestData request) {
+        CreateTopicsResponseData response = new CreateTopicsResponseData();
+        for (CreatableTopic topic : request.topics()) {
+            if (topics.containsKey(topic.name())) {

Review comment:
       the key type of `topics` is `Uuid` and hence this check is weird. Maybe 
it should be replaced by `topicNameToId`

##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -32,10 +32,13 @@ import org.apache.kafka.common.Uuid.ZERO_UUID
 import org.apache.kafka.common.errors.{InvalidRequestException, 
NotControllerException, TopicDeletionDisabledException}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicCollection}
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
 import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
-import org.apache.kafka.common.message.{BrokerRegistrationRequestData, 
DeleteTopicsRequestData}
+import org.apache.kafka.common.message.{BrokerRegistrationRequestData, 
CreateTopicsRequestData, DeleteTopicsRequestData}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, NONE, 
TOPIC_AUTHORIZATION_FAILED}

Review comment:
       The code `Errors.TOPIC_AUTHORIZATION_FAILED` still exists in this file. 
Could you remove that redundant prefix?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to