Repository: kafka Updated Branches: refs/heads/trunk 3d9f34dd8 -> 063d534c5
KAFKA-3959: enforce offsets.topic.replication.factor upon __consumer_offsets auto topic creation (KIP-115) Kafka brokers have a config called "offsets.topic.replication.factor" that specify the replication factor for the "__consumer_offsets" topic. The problem is that this config isn't being enforced. If an attempt to create the internal topic is made when there are fewer brokers than "offsets.topic.replication.factor", the topic ends up getting created anyway with the current number of live brokers. The current behavior is pretty surprising when you have clients or tooling running as the cluster is getting setup. Even if your cluster ends up being huge, you'll find out much later that __consumer_offsets was setup with no replication. The cluster not meeting the "offsets.topic.replication.factor" requirement on the internal topic is another way of saying the cluster isn't fully setup yet. The right behavior should be for "offsets.topic.replication.factor" to be enforced. Topic creation of the internal topic should fail with GROUP_COORDINATOR_NOT_AVAILABLE until the "offsets.topic.replication.factor" requirement is met. This closely resembles the behavior of regular topic creation when the requested replication factor exceeds the current size of the cluster, as the request fails with error INVALID_REPLICATION_FACTOR. Author: Onur Karaman <okara...@linkedin.com> Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <ism...@juma.me.uk>, Ewen Cheslack-Postava <e...@confluent.io> Closes #2177 from onurkaraman/KAFKA-3959 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/063d534c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/063d534c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/063d534c Branch: refs/heads/trunk Commit: 063d534c5160316cdf22e476d128e872a1412783 Parents: 3d9f34d Author: Onur Karaman <okara...@linkedin.com> Authored: Wed Feb 1 19:55:06 2017 -0800 Committer: Ewen Cheslack-Postava <m...@ewencp.org> Committed: Wed Feb 1 19:55:06 2017 -0800 ---------------------------------------------------------------------- config/server.properties | 5 +++++ .../src/main/scala/kafka/server/KafkaApis.scala | 20 ++++++++++++-------- .../main/scala/kafka/server/KafkaConfig.scala | 5 +---- .../test/scala/unit/kafka/utils/TestUtils.scala | 1 + docs/upgrade.html | 1 + .../integration/utils/EmbeddedKafkaCluster.java | 1 + .../services/kafka/templates/kafka.properties | 1 + 7 files changed, 22 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/config/server.properties ---------------------------------------------------------------------- diff --git a/config/server.properties b/config/server.properties index 506d0e7..37b5bb3 100644 --- a/config/server.properties +++ b/config/server.properties @@ -71,6 +71,11 @@ num.partitions=1 # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topic "__consumer_offsets". +# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. +offsets.topic.replication.factor=1 + ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7bb3ed5..785c9ae 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -778,13 +778,13 @@ class KafkaApis(val requestChannel: RequestChannel, private def createGroupMetadataTopic(): MetadataResponse.TopicMetadata = { val aliveBrokers = metadataCache.getAliveBrokers - val offsetsTopicReplicationFactor = - if (aliveBrokers.nonEmpty) - Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length) - else - config.offsetsTopicReplicationFactor.toInt - createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions, - offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs) + if (aliveBrokers.size < config.offsetsTopicReplicationFactor) { + new MetadataResponse.TopicMetadata(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Topic.GroupMetadataTopicName, true, + java.util.Collections.emptyList()) + } else { + createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions, + config.offsetsTopicReplicationFactor.toInt, coordinator.offsetsTopicConfigs) + } } private def getOrCreateGroupMetadataTopic(listenerName: ListenerName): MetadataResponse.TopicMetadata = { @@ -800,7 +800,11 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => if (topic == Topic.GroupMetadataTopicName) { - createGroupMetadataTopic() + val topicMetadata = createGroupMetadataTopic() + if (topicMetadata.error() == Errors.GROUP_COORDINATOR_NOT_AVAILABLE) { + new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, Topic.isInternal(topic), + java.util.Collections.emptyList()) + } else topicMetadata } else if (config.autoCreateTopicsEnable) { createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 0f0b291..7946475 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -541,10 +541,7 @@ object KafkaConfig { val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an offset commit" val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading offsets into the cache." val OffsetsTopicReplicationFactorDoc = "The replication factor for the offsets topic (set higher to ensure availability). " + - "To ensure that the effective replication factor of the offsets topic is the configured value, " + - "the number of alive brokers has to be at least the replication factor at the time of the " + - "first request for the offsets topic. If not, either the offsets topic creation will fail or " + - "it will get a replication factor of min(alive brokers, configured replication factor)" + "Internal topic creation will fail until the cluster size meets this replication factor requirement." val OffsetsTopicPartitionsDoc = "The number of partitions for the offset commit topic (should not change after deployment)" val OffsetsTopicSegmentBytesDoc = "The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads" val OffsetsTopicCompressionCodecDoc = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits" http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 743756e..98daec1 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -217,6 +217,7 @@ object TestUtils extends Logging { props.put(KafkaConfig.DeleteTopicEnableProp, enableDeleteTopic.toString) props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "100") props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "2097152") + props.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") rack.foreach(props.put(KafkaConfig.RackProp, _)) if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) }) http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index ebc61db..da5c983 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -72,6 +72,7 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8. should not be set in the Streams app any more. If the Kafka cluster is secured, Streams apps must have the required security privileges to create new topics.</li> <li>Several new fields including "security.protocol", "connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and "request.timeout.ms" were added to StreamsConfig class. User should pay attenntion to the default values and set these if needed. For more details please refer to <a id="streamsconfigs" href="#streamsconfigs">3.5 Kafka Streams Configs</a>.</li> + <li>The <code>offsets.topic.replication.factor</code> broker config is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this replication factor requirement.</li> </ul> <h5><a id="upgrade_1020_new_protocols" href="#upgrade_1020_new_protocols">New Protocol Versions</a></h5> http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index fe7bebc..656959a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -68,6 +68,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true); putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L); putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0); + putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1); putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true); for (int i = 0; i < brokers.length; i++) { http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/tests/kafkatest/services/kafka/templates/kafka.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 7f3a920..c971715 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -63,3 +63,4 @@ replica.lag.time.max.ms={{replica_lag}} {% if auto_create_topics_enable is defined and auto_create_topics_enable is not none %} auto.create.topics.enable={{ auto_create_topics_enable }} {% endif %} +offsets.topic.replication.factor={{ 3 if num_nodes > 3 else num_nodes }} \ No newline at end of file