[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-05 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570396673



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1113,82 +1097,36 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-if (topic == null)
-  throw new IllegalArgumentException("topic must not be null")
-
-val aliveBrokers = metadataCache.getAliveBrokers
-
-topic match {
-  case GROUP_METADATA_TOPIC_NAME =>
-if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-  error(s"Number of alive brokers '${aliveBrokers.size}' does not meet 
the required replication factor " +
-s"'${config.offsetsTopicReplicationFactor}' for the offsets topic 
(configured via " +
-s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error 
can be ignored if the cluster is starting up " +
-s"and not all brokers are up yet.")
-  metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, 
util.Collections.emptyList())
-} else {
-  createTopic(topic, config.offsetsTopicPartitions, 
config.offsetsTopicReplicationFactor.toInt,
-groupCoordinator.offsetsTopicConfigs)
-}
-  case TRANSACTION_STATE_TOPIC_NAME =>
-if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-  error(s"Number of alive brokers '${aliveBrokers.size}' does not meet 
the required replication factor " +
-s"'${config.transactionTopicReplicationFactor}' for the 
transactions state topic (configured via " +
-s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This 
error can be ignored if the cluster is starting up " +
-s"and not all brokers are up yet.")
-  metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, 
util.Collections.emptyList())
-} else {
-  createTopic(topic, config.transactionTopicPartitions, 
config.transactionTopicReplicationFactor.toInt,
-txnCoordinator.transactionTopicConfigs)
-}
-  case _ => throw new IllegalArgumentException(s"Unexpected internal topic 
name: $topic")
-}
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: 
ListenerName): MetadataResponseData.MetadataResponseTopic = {
-val topicMetadata = metadataCache.getTopicMetadata(Set(topic), 
listenerName)
-topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, 
isFetchAllMetadata: Boolean,
-   topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(isFetchAllMetadata: Boolean,
+   topics: Set[String],
+   listenerName: ListenerName,
errorUnavailableEndpoints: Boolean,
-   errorUnavailableListeners: Boolean): 
Seq[MetadataResponseTopic] = {
+   errorUnavailableListeners: Boolean): 
(Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
 val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
 errorUnavailableEndpoints, errorUnavailableListeners)
 
 if (topics.isEmpty || topicResponses.size == topics.size) {
-  topicResponses
+  (topicResponses, Seq.empty[MetadataResponseTopic])
 } else {
   val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
   val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-if (isInternal(topic)) {
-  val topicMetadata = createInternalTopic(topic)
-  Some(
-if (topicMetadata.errorCode == 
Errors.COORDINATOR_NOT_AVAILABLE.code)
-  metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, 
true, util.Collections.emptyList())
-else
-  topicMetadata
-  )
-} else if (isFetchAllMetadata) {
+   if (isFetchAllMetadata) {

Review comment:
   Yes, I think so.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
 // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching 
listener was not found on the leader.
 // From version 6 onwards, we return LISTENER_NOT_FOUND to enable 
diagnosis of configuration errors.
 val errorUnavailableListeners = requestVersion >= 6
-val topicMetadata =
+val (topicMetadata, nonExistTopicMetadata) =
   if (authorizedTopics.isEmpty)
-Seq.empty[MetadataResponseTopic]
-  else {
-getTopicMetadata(
-  metadataRequest.allowAutoTopicCreation,
-  metadataRequest.isAllTopics,
-  authorizedTopics,
-  

[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-04 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570654953



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))
+.setNodeId(node.id)
+.setHost(node.host)
+.setPort(node.port)
+.setThrottleTimeMs(requestThrottleMs))
   }
 
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-  new FindCoordinatorResponse(
-  new FindCoordinatorResponseData()
-.setErrorCode(error.code)
-.setErrorMessage(error.message)
-.setNodeId(node.id)
-.setHost(node.host)
-.setPort(node.port)
-.setThrottleTimeMs(requestThrottleMs))
+  val topicCreationNeeded = topicMetadata.headOption.isEmpty
+  if (topicCreationNeeded) {
+if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
   Sounds good





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-04 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570568151



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 (responseTopics ++ unauthorizedResponseStatus).toList
   }
 
-  private def createTopic(topic: String,
-  numPartitions: Int,
-  replicationFactor: Int,
-  properties: util.Properties = new 
util.Properties()): MetadataResponseTopic = {
-try {
-  adminZkClient.createTopic(topic, numPartitions, replicationFactor, 
properties, RackAwareMode.Safe)
-  info("Auto creation of topic %s with %d partitions and replication 
factor %d is successful"
-.format(topic, numPartitions, replicationFactor))
-  metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, 
isInternal(topic), util.Collections.emptyList())
-} catch {
-  case _: TopicExistsException => // let it go, possibly another broker 
created this topic

Review comment:
   The problem we have is that `ZkAdminManager.createTopics` only takes a 
callback instead of responding to you in realtime whether we hit TopicExists. 
Right now we are doing the topic creation async, so unless this is necessary to 
be fixed (which today we would just return UNKNOWN_PARTITION which seems to be 
semantically similar to LEADER_NOT_AVAILABLE), I think we could just returning 
unknown partition immediately without waiting for the async creation?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-04 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570492431



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
 // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching 
listener was not found on the leader.
 // From version 6 onwards, we return LISTENER_NOT_FOUND to enable 
diagnosis of configuration errors.
 val errorUnavailableListeners = requestVersion >= 6
-val topicMetadata =
+val (topicMetadata, nonExistTopicMetadata) =
   if (authorizedTopics.isEmpty)
-Seq.empty[MetadataResponseTopic]
-  else {
-getTopicMetadata(
-  metadataRequest.allowAutoTopicCreation,
-  metadataRequest.isAllTopics,
-  authorizedTopics,
-  request.context.listenerName,
-  errorUnavailableEndpoints,
-  errorUnavailableListeners
-)
+(Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+  else
+getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+  request.context.listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners)
+
+nonExistTopicMetadata.foreach(metadata =>
+  try {
+// Validate topic name and propagate error if failed
+Topic.validate(metadata.name())
+  } catch {
+case e: Exception =>
+  metadata.setErrorCode(Errors.forException(e).code)
   }
+)
+
+if (nonExistTopicMetadata.nonEmpty && 
metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable) {
+  val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+  autoTopicCreationManager.createTopics(
+nonExistTopicMetadata.map(metadata => 
getTopicConfigs(metadata.name())).toSet, controllerMutationQuota)

Review comment:
   I guess we could rely on admin manager to do the validation for us.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-04 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570492221



##
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+topicNames: Set[CreatableTopic],
+controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+config: KafkaConfig,
+metadataCache: MetadataCache,
+time: Time,
+metrics: Metrics,
+threadNamePrefix: Option[String],
+adminManager: ZkAdminManager,
+controller: KafkaController,
+enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+val channelManager =
+  if (enableForwarding)
+Some(new BrokerToControllerChannelManager(
+  controllerNodeProvider = MetadataCacheControllerNodeProvider(
+config, metadataCache),
+  time = time,
+  metrics = metrics,
+  config = config,
+  channelName = "autoTopicCreationChannel",
+  threadNamePrefix = threadNamePrefix,
+  retryTimeoutMs = config.requestTimeoutMs.longValue
+))
+  else
+None
+new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, 
config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+controllerMutationQuota: ControllerMutationQuota): 
Unit = {
+val topicConfigs = topics
+  .filter(topic => !inflightTopics.contains(topic.name()))

Review comment:
   You mean omit () for `topic.name()`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-04 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570480399



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
 // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching 
listener was not found on the leader.
 // From version 6 onwards, we return LISTENER_NOT_FOUND to enable 
diagnosis of configuration errors.
 val errorUnavailableListeners = requestVersion >= 6
-val topicMetadata =
+val (topicMetadata, nonExistTopicMetadata) =
   if (authorizedTopics.isEmpty)
-Seq.empty[MetadataResponseTopic]
-  else {
-getTopicMetadata(
-  metadataRequest.allowAutoTopicCreation,
-  metadataRequest.isAllTopics,
-  authorizedTopics,
-  request.context.listenerName,
-  errorUnavailableEndpoints,
-  errorUnavailableListeners
-)
+(Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+  else
+getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+  request.context.listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners)
+
+nonExistTopicMetadata.foreach(metadata =>
+  try {
+// Validate topic name and propagate error if failed
+Topic.validate(metadata.name())

Review comment:
   Actually after looking into the zk admin manager logic, I don't think 
it's necessary to do the topic validation here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-04 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570396673



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1113,82 +1097,36 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-if (topic == null)
-  throw new IllegalArgumentException("topic must not be null")
-
-val aliveBrokers = metadataCache.getAliveBrokers
-
-topic match {
-  case GROUP_METADATA_TOPIC_NAME =>
-if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-  error(s"Number of alive brokers '${aliveBrokers.size}' does not meet 
the required replication factor " +
-s"'${config.offsetsTopicReplicationFactor}' for the offsets topic 
(configured via " +
-s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error 
can be ignored if the cluster is starting up " +
-s"and not all brokers are up yet.")
-  metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, 
util.Collections.emptyList())
-} else {
-  createTopic(topic, config.offsetsTopicPartitions, 
config.offsetsTopicReplicationFactor.toInt,
-groupCoordinator.offsetsTopicConfigs)
-}
-  case TRANSACTION_STATE_TOPIC_NAME =>
-if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-  error(s"Number of alive brokers '${aliveBrokers.size}' does not meet 
the required replication factor " +
-s"'${config.transactionTopicReplicationFactor}' for the 
transactions state topic (configured via " +
-s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This 
error can be ignored if the cluster is starting up " +
-s"and not all brokers are up yet.")
-  metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, 
util.Collections.emptyList())
-} else {
-  createTopic(topic, config.transactionTopicPartitions, 
config.transactionTopicReplicationFactor.toInt,
-txnCoordinator.transactionTopicConfigs)
-}
-  case _ => throw new IllegalArgumentException(s"Unexpected internal topic 
name: $topic")
-}
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: 
ListenerName): MetadataResponseData.MetadataResponseTopic = {
-val topicMetadata = metadataCache.getTopicMetadata(Set(topic), 
listenerName)
-topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, 
isFetchAllMetadata: Boolean,
-   topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(isFetchAllMetadata: Boolean,
+   topics: Set[String],
+   listenerName: ListenerName,
errorUnavailableEndpoints: Boolean,
-   errorUnavailableListeners: Boolean): 
Seq[MetadataResponseTopic] = {
+   errorUnavailableListeners: Boolean): 
(Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
 val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
 errorUnavailableEndpoints, errorUnavailableListeners)
 
 if (topics.isEmpty || topicResponses.size == topics.size) {
-  topicResponses
+  (topicResponses, Seq.empty[MetadataResponseTopic])
 } else {
   val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
   val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-if (isInternal(topic)) {
-  val topicMetadata = createInternalTopic(topic)
-  Some(
-if (topicMetadata.errorCode == 
Errors.COORDINATOR_NOT_AVAILABLE.code)
-  metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, 
true, util.Collections.emptyList())
-else
-  topicMetadata
-  )
-} else if (isFetchAllMetadata) {
+   if (isFetchAllMetadata) {

Review comment:
   Yes, I think so.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-03 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569929139



##
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+topicNames: Set[CreatableTopic],
+controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+config: KafkaConfig,
+metadataCache: MetadataCache,
+time: Time,
+metrics: Metrics,
+threadNamePrefix: Option[String],
+adminManager: ZkAdminManager,
+controller: KafkaController,
+enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+val channelManager =
+  if (enableForwarding)
+Some(new BrokerToControllerChannelManager(
+  controllerNodeProvider = MetadataCacheControllerNodeProvider(
+config, metadataCache),
+  time = time,
+  metrics = metrics,
+  config = config,
+  channelName = "autoTopicCreationChannel",
+  threadNamePrefix = threadNamePrefix,
+  retryTimeoutMs = config.requestTimeoutMs.longValue
+))
+  else
+None
+new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, 
config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+controllerMutationQuota: ControllerMutationQuota): 
Unit = {
+val topicConfigs = topics
+  .filter(topic => !inflightTopics.contains(topic.name()))
+  .map(topic => {(topic.name(), topic)}).toMap
+
+if (topicConfigs.nonEmpty) {
+  if (!controller.isActive && channelManager.isDefined) {
+// Mark the topics as inflight during auto creation through forwarding.
+topicConfigs.foreach(config => inflightTopics.put(config._1, 
config._2))
+
+val topicsToCreate = new 
CreateTopicsRequestData.CreatableTopicCollection
+topicConfigs.foreach(config => topicsToCreate.add(config._2))
+val createTopicsRequest = new CreateTopicsRequest.Builder(
+  new CreateTopicsRequestData()
+.setTimeoutMs(requestTimeout)
+.setTopics(topicsToCreate)
+)
+
+channelManager.get.sendRequest(createTopicsRequest, new 
ControllerRequestCompletionHandler {
+  override def onTimeout(): Unit = {
+clearInflightRequests(topicConfigs)
+  }
+
+  override def onComplete(response: ClientResponse): Unit = {
+clearInflightRequests(topicConfigs)
+  }
+})
+  } else {
+adminManager.createTopics(
+  requestTimeout,
+  validateOnly = false,
+  topicConfigs,
+  Map.empty,
+  controllerMutationQuota,
+  _ => ())
+  }
+} else {
+  debug(s"Topics $topics are under creation already, skip sending 
additional " +

Review comment:
   Actually I don't think this logging is very useful, will replace it with 
something more explicit about state change.





[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-03 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569926897



##
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+topicNames: Set[CreatableTopic],
+controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+config: KafkaConfig,
+metadataCache: MetadataCache,
+time: Time,
+metrics: Metrics,
+threadNamePrefix: Option[String],
+adminManager: ZkAdminManager,
+controller: KafkaController,
+enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+val channelManager =
+  if (enableForwarding)
+Some(new BrokerToControllerChannelManager(

Review comment:
   Sure, we do have https://issues.apache.org/jira/browse/KAFKA-10348 to 
track.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-03 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569913506



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {

Review comment:
   Seems no longer in use, will remove and revert back to using error code 
message.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-03 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569912931



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))

Review comment:
   Makes sense





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-03 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569706514



##
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+topicNames: Set[CreatableTopic],
+controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+config: KafkaConfig,
+metadataCache: MetadataCache,
+time: Time,
+metrics: Metrics,
+threadNamePrefix: Option[String],
+adminManager: ZkAdminManager,
+controller: KafkaController,
+enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+val channelManager =
+  if (enableForwarding)
+Some(new BrokerToControllerChannelManager(
+  controllerNodeProvider = MetadataCacheControllerNodeProvider(
+config, metadataCache),
+  time = time,
+  metrics = metrics,
+  config = config,
+  channelName = "autoTopicCreationChannel",
+  threadNamePrefix = threadNamePrefix,
+  retryTimeoutMs = config.requestTimeoutMs.longValue
+))
+  else
+None
+new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, 
config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]

Review comment:
   Seems I could omit the value here: 
https://stackoverflow.com/questions/40993683/scala-thread-safe-hashset





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-27 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))
+.setNodeId(node.id)
+.setHost(node.host)
+.setPort(node.port)
+.setThrottleTimeMs(requestThrottleMs))
   }
 
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-  new FindCoordinatorResponse(
-  new FindCoordinatorResponseData()
-.setErrorCode(error.code)
-.setErrorMessage(error.message)
-.setNodeId(node.id)
-.setHost(node.host)
-.setPort(node.port)
-.setThrottleTimeMs(requestThrottleMs))
+  val topicCreationNeeded = topicMetadata.headOption.isEmpty
+  if (topicCreationNeeded) {
+if (hasEnoughAliveBrokers(internalTopicName)) {
+  if (shouldForwardRequest(request)) {
+forwardingManager.sendInterBrokerRequest(
+  getCreateTopicsRequest(Seq(internalTopicName)),
+  _ => ())
+  } else {
+val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+val topicConfigs = Map(internalTopicName -> 
getTopicConfigs(internalTopicName))
+adminManager.createTopics(
+  config.requestTimeoutMs,
+  validateOnly = false,
+  topicConfigs,
+  Map.empty,
+  controllerMutationQuota,
+  _ => ())
+  }
 }
-val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
-} else {
-  val coordinatorEndpoint = topicMetadata.partitions.asScala
-.find(_.partitionIndex == partition)
-.filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-.flatMap(metadata => 
metadataCache.getAliveBroker(metadata.leaderId))
-.flatMap(_.getNode(request.context.listenerName))
-.filterNot(_.isEmpty)
-
-  coordinatorEndpoint match {
-case Some(endpoint) =>
-  createFindCoordinatorResponse(Errors.NONE, endpoint)
-case _ =>
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
+
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
createFindCoordinatorResponse(
+  Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+  } else {
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  val responseBody = if (topicMetadata.head.errorCode != 
Errors.NONE.code) {
+   

[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-26 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r565035383



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -61,7 +61,6 @@ class BrokerToControllerChannelManager(
 
   def shutdown(): Unit = {
 requestThread.shutdown()
-requestThread.awaitShutdown()

Review comment:
   Side cleanup which is called within `requestThread.shutdown()`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-22 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r563003389



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1110,82 +1098,39 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-if (topic == null)
-  throw new IllegalArgumentException("topic must not be null")
-
-val aliveBrokers = metadataCache.getAliveBrokers
-
-topic match {
-  case GROUP_METADATA_TOPIC_NAME =>
-if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-  error(s"Number of alive brokers '${aliveBrokers.size}' does not meet 
the required replication factor " +
-s"'${config.offsetsTopicReplicationFactor}' for the offsets topic 
(configured via " +
-s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error 
can be ignored if the cluster is starting up " +
-s"and not all brokers are up yet.")
-  metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, 
util.Collections.emptyList())
-} else {
-  createTopic(topic, config.offsetsTopicPartitions, 
config.offsetsTopicReplicationFactor.toInt,
-groupCoordinator.offsetsTopicConfigs)
-}
-  case TRANSACTION_STATE_TOPIC_NAME =>
-if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-  error(s"Number of alive brokers '${aliveBrokers.size}' does not meet 
the required replication factor " +
-s"'${config.transactionTopicReplicationFactor}' for the 
transactions state topic (configured via " +
-s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This 
error can be ignored if the cluster is starting up " +
-s"and not all brokers are up yet.")
-  metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, 
util.Collections.emptyList())
-} else {
-  createTopic(topic, config.transactionTopicPartitions, 
config.transactionTopicReplicationFactor.toInt,
-txnCoordinator.transactionTopicConfigs)
-}
-  case _ => throw new IllegalArgumentException(s"Unexpected internal topic 
name: $topic")
-}
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: 
ListenerName): MetadataResponseData.MetadataResponseTopic = {
-val topicMetadata = metadataCache.getTopicMetadata(Set(topic), 
listenerName)
-topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, 
isFetchAllMetadata: Boolean,
-   topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(allowAutoTopicCreation: Boolean,
+   isFetchAllMetadata: Boolean,
+   topics: Set[String],
+   listenerName: ListenerName,
errorUnavailableEndpoints: Boolean,
-   errorUnavailableListeners: Boolean): 
Seq[MetadataResponseTopic] = {
+   errorUnavailableListeners: Boolean): 
(Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
 val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
 errorUnavailableEndpoints, errorUnavailableListeners)
 
 if (topics.isEmpty || topicResponses.size == topics.size) {
-  topicResponses
+  (topicResponses, Seq.empty[MetadataResponseTopic])
 } else {
   val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
   val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-if (isInternal(topic)) {
-  val topicMetadata = createInternalTopic(topic)
-  Some(
-if (topicMetadata.errorCode == 
Errors.COORDINATOR_NOT_AVAILABLE.code)
-  metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, 
true, util.Collections.emptyList())
-else
-  topicMetadata
-  )
-} else if (isFetchAllMetadata) {
+   if (isFetchAllMetadata) {
   // A metadata request for all topics should never result in topic 
auto creation, but a topic may be deleted
   // in between the creation of the topics parameter and 
topicResponses, so make sure to return None for this case.
   None
-} else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
-  Some(createTopic(topic, config.numPartitions, 
config.defaultReplicationFactor))
-} else {
-  Some(metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, 
false, util.Collections.emptyList()))
+   } else {
+Some(metadataResponseTopic(
+  if (!hasEnoughAliveBrokers(topic))
+Errors.INVALID_REPLICATION_FACTOR
+  else if 

[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-22 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r563002911



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 (responseTopics ++ unauthorizedResponseStatus).toList
   }
 
-  private def createTopic(topic: String,
-  numPartitions: Int,
-  replicationFactor: Int,
-  properties: util.Properties = new 
util.Properties()): MetadataResponseTopic = {
-try {
-  adminZkClient.createTopic(topic, numPartitions, replicationFactor, 
properties, RackAwareMode.Safe)
-  info("Auto creation of topic %s with %d partitions and replication 
factor %d is successful"
-.format(topic, numPartitions, replicationFactor))
-  metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, 
isInternal(topic), util.Collections.emptyList())
-} catch {
-  case _: TopicExistsException => // let it go, possibly another broker 
created this topic

Review comment:
   We intentionally avoid using adminZkClient so that we could go through 
topic creation rules through `zkAdminManager`. TopicExistsException is handled 
there. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-22 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r562792214



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))
+.setNodeId(node.id)
+.setHost(node.host)
+.setPort(node.port)
+.setThrottleTimeMs(requestThrottleMs))
   }
 
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-  new FindCoordinatorResponse(
-  new FindCoordinatorResponseData()
-.setErrorCode(error.code)
-.setErrorMessage(error.message)
-.setNodeId(node.id)
-.setHost(node.host)
-.setPort(node.port)
-.setThrottleTimeMs(requestThrottleMs))
+  val topicCreationNeeded = topicMetadata.headOption.isEmpty
+  if (topicCreationNeeded) {
+if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
   I guess the purpose of doing live broker check here is to avoid sending 
excessive create topic requests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-21 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))
+.setNodeId(node.id)
+.setHost(node.host)
+.setPort(node.port)
+.setThrottleTimeMs(requestThrottleMs))
   }
 
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-  new FindCoordinatorResponse(
-  new FindCoordinatorResponseData()
-.setErrorCode(error.code)
-.setErrorMessage(error.message)
-.setNodeId(node.id)
-.setHost(node.host)
-.setPort(node.port)
-.setThrottleTimeMs(requestThrottleMs))
+  val topicCreationNeeded = topicMetadata.headOption.isEmpty
+  if (topicCreationNeeded) {
+if (hasEnoughAliveBrokers(internalTopicName)) {
+  if (shouldForwardRequest(request)) {
+forwardingManager.sendInterBrokerRequest(
+  getCreateTopicsRequest(Seq(internalTopicName)),
+  _ => ())
+  } else {
+val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+val topicConfigs = Map(internalTopicName -> 
getTopicConfigs(internalTopicName))
+adminManager.createTopics(
+  config.requestTimeoutMs,
+  validateOnly = false,
+  topicConfigs,
+  Map.empty,
+  controllerMutationQuota,
+  _ => ())
+  }
 }
-val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
-} else {
-  val coordinatorEndpoint = topicMetadata.partitions.asScala
-.find(_.partitionIndex == partition)
-.filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-.flatMap(metadata => 
metadataCache.getAliveBroker(metadata.leaderId))
-.flatMap(_.getNode(request.context.listenerName))
-.filterNot(_.isEmpty)
-
-  coordinatorEndpoint match {
-case Some(endpoint) =>
-  createFindCoordinatorResponse(Errors.NONE, endpoint)
-case _ =>
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
+
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
createFindCoordinatorResponse(
+  Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+  } else {
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  val responseBody = if (topicMetadata.head.errorCode != 
Errors.NONE.code) {
+   

[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-20 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))
+.setNodeId(node.id)
+.setHost(node.host)
+.setPort(node.port)
+.setThrottleTimeMs(requestThrottleMs))
   }
 
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-  new FindCoordinatorResponse(
-  new FindCoordinatorResponseData()
-.setErrorCode(error.code)
-.setErrorMessage(error.message)
-.setNodeId(node.id)
-.setHost(node.host)
-.setPort(node.port)
-.setThrottleTimeMs(requestThrottleMs))
+  val topicCreationNeeded = topicMetadata.headOption.isEmpty
+  if (topicCreationNeeded) {
+if (hasEnoughAliveBrokers(internalTopicName)) {
+  if (shouldForwardRequest(request)) {
+forwardingManager.sendInterBrokerRequest(
+  getCreateTopicsRequest(Seq(internalTopicName)),
+  _ => ())
+  } else {
+val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+val topicConfigs = Map(internalTopicName -> 
getTopicConfigs(internalTopicName))
+adminManager.createTopics(
+  config.requestTimeoutMs,
+  validateOnly = false,
+  topicConfigs,
+  Map.empty,
+  controllerMutationQuota,
+  _ => ())
+  }
 }
-val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
-} else {
-  val coordinatorEndpoint = topicMetadata.partitions.asScala
-.find(_.partitionIndex == partition)
-.filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-.flatMap(metadata => 
metadataCache.getAliveBroker(metadata.leaderId))
-.flatMap(_.getNode(request.context.listenerName))
-.filterNot(_.isEmpty)
-
-  coordinatorEndpoint match {
-case Some(endpoint) =>
-  createFindCoordinatorResponse(Errors.NONE, endpoint)
-case _ =>
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
+
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
createFindCoordinatorResponse(
+  Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+  } else {
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  val responseBody = if (topicMetadata.head.errorCode != 
Errors.NONE.code) {
+