abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r563003389
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1110,82 +1098,39 @@ class KafkaApis(val requestChannel: RequestChannel,
.setPartitions(partitionData)
}
- private def createInternalTopic(topic: String): MetadataResponseTopic = {
- if (topic == null)
- throw new IllegalArgumentException("topic must not be null")
-
- val aliveBrokers = metadataCache.getAliveBrokers
-
- topic match {
- case GROUP_METADATA_TOPIC_NAME =>
- if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
- error(s"Number of alive brokers '${aliveBrokers.size}' does not meet
the required replication factor " +
- s"'${config.offsetsTopicReplicationFactor}' for the offsets topic
(configured via " +
- s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error
can be ignored if the cluster is starting up " +
- s"and not all brokers are up yet.")
- metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true,
util.Collections.emptyList())
- } else {
- createTopic(topic, config.offsetsTopicPartitions,
config.offsetsTopicReplicationFactor.toInt,
- groupCoordinator.offsetsTopicConfigs)
- }
- case TRANSACTION_STATE_TOPIC_NAME =>
- if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
- error(s"Number of alive brokers '${aliveBrokers.size}' does not meet
the required replication factor " +
- s"'${config.transactionTopicReplicationFactor}' for the
transactions state topic (configured via " +
- s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This
error can be ignored if the cluster is starting up " +
- s"and not all brokers are up yet.")
- metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true,
util.Collections.emptyList())
- } else {
- createTopic(topic, config.transactionTopicPartitions,
config.transactionTopicReplicationFactor.toInt,
- txnCoordinator.transactionTopicConfigs)
- }
- case _ => throw new IllegalArgumentException(s"Unexpected internal topic
name: $topic")
- }
- }
-
- private def getOrCreateInternalTopic(topic: String, listenerName:
ListenerName): MetadataResponseData.MetadataResponseTopic = {
- val topicMetadata = metadataCache.getTopicMetadata(Set(topic),
listenerName)
- topicMetadata.headOption.getOrElse(createInternalTopic(topic))
- }
-
- private def getTopicMetadata(allowAutoTopicCreation: Boolean,
isFetchAllMetadata: Boolean,
- topics: Set[String], listenerName: ListenerName,
+ private def getTopicMetadata(allowAutoTopicCreation: Boolean,
+ isFetchAllMetadata: Boolean,
+ topics: Set[String],
+ listenerName: ListenerName,
errorUnavailableEndpoints: Boolean,
- errorUnavailableListeners: Boolean):
Seq[MetadataResponseTopic] = {
+ errorUnavailableListeners: Boolean):
(Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
errorUnavailableEndpoints, errorUnavailableListeners)
if (topics.isEmpty || topicResponses.size == topics.size) {
- topicResponses
+ (topicResponses, Seq.empty[MetadataResponseTopic])
} else {
val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
- if (isInternal(topic)) {
- val topicMetadata = createInternalTopic(topic)
- Some(
- if (topicMetadata.errorCode ==
Errors.COORDINATOR_NOT_AVAILABLE.code)
- metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic,
true, util.Collections.emptyList())
- else
- topicMetadata
- )
- } else if (isFetchAllMetadata) {
+ if (isFetchAllMetadata) {
// A metadata request for all topics should never result in topic
auto creation, but a topic may be deleted
// in between the creation of the topics parameter and
topicResponses, so make sure to return None for this case.
None
- } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
- Some(createTopic(topic, config.numPartitions,
config.defaultReplicationFactor))
- } else {
- Some(metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic,
false, util.Collections.emptyList()))
+ } else {
+ Some(metadataResponseTopic(
+ if (!hasEnoughAliveBrokers(topic))
+ Errors.INVALID_REPLICATION_FACTOR
+ else if (allowAutoTopicCreation && config.autoCreateTopicsEnable)
+ Errors.LEADER_NOT_AVAILABLE
Review comment:
That makes sense
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]