Repository: kafka
Updated Branches:
  refs/heads/trunk 3d9f34dd8 -> 063d534c5


KAFKA-3959: enforce offsets.topic.replication.factor upon __consumer_offsets 
auto topic creation (KIP-115)

Kafka brokers have a config called "offsets.topic.replication.factor" that 
specify the replication factor for the "__consumer_offsets" topic. The problem 
is that this config isn't being enforced. If an attempt to create the internal 
topic is made when there are fewer brokers than 
"offsets.topic.replication.factor", the topic ends up getting created anyway 
with the current number of live brokers. The current behavior is pretty 
surprising when you have clients or tooling running as the cluster is getting 
setup. Even if your cluster ends up being huge, you'll find out much later that 
__consumer_offsets was setup with no replication.

The cluster not meeting the "offsets.topic.replication.factor" requirement on 
the internal topic is another way of saying the cluster isn't fully setup yet.

The right behavior should be for "offsets.topic.replication.factor" to be 
enforced. Topic creation of the internal topic should fail with 
GROUP_COORDINATOR_NOT_AVAILABLE until the "offsets.topic.replication.factor" 
requirement is met. This closely resembles the behavior of regular topic 
creation when the requested replication factor exceeds the current size of the 
cluster, as the request fails with error INVALID_REPLICATION_FACTOR.

Author: Onur Karaman <okara...@linkedin.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma 
<ism...@juma.me.uk>, Ewen Cheslack-Postava <e...@confluent.io>

Closes #2177 from onurkaraman/KAFKA-3959


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/063d534c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/063d534c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/063d534c

Branch: refs/heads/trunk
Commit: 063d534c5160316cdf22e476d128e872a1412783
Parents: 3d9f34d
Author: Onur Karaman <okara...@linkedin.com>
Authored: Wed Feb 1 19:55:06 2017 -0800
Committer: Ewen Cheslack-Postava <m...@ewencp.org>
Committed: Wed Feb 1 19:55:06 2017 -0800

----------------------------------------------------------------------
 config/server.properties                        |  5 +++++
 .../src/main/scala/kafka/server/KafkaApis.scala | 20 ++++++++++++--------
 .../main/scala/kafka/server/KafkaConfig.scala   |  5 +----
 .../test/scala/unit/kafka/utils/TestUtils.scala |  1 +
 docs/upgrade.html                               |  1 +
 .../integration/utils/EmbeddedKafkaCluster.java |  1 +
 .../services/kafka/templates/kafka.properties   |  1 +
 7 files changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 506d0e7..37b5bb3 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -71,6 +71,11 @@ num.partitions=1
 # This value is recommended to be increased for installations with data dirs 
located in RAID array.
 num.recovery.threads.per.data.dir=1
 
+############################# Internal Topic Settings  
#############################
+# The replication factor for the group metadata internal topic 
"__consumer_offsets".
+# For anything other than development testing, a value greater than 1 is 
recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+
 ############################# Log Flush Policy #############################
 
 # Messages are immediately written to the filesystem but by default we only 
fsync() to sync

http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7bb3ed5..785c9ae 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -778,13 +778,13 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   private def createGroupMetadataTopic(): MetadataResponse.TopicMetadata = {
     val aliveBrokers = metadataCache.getAliveBrokers
-    val offsetsTopicReplicationFactor =
-      if (aliveBrokers.nonEmpty)
-        Math.min(config.offsetsTopicReplicationFactor.toInt, 
aliveBrokers.length)
-      else
-        config.offsetsTopicReplicationFactor.toInt
-    createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions,
-      offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs)
+    if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
+      new 
MetadataResponse.TopicMetadata(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, 
Topic.GroupMetadataTopicName, true,
+        java.util.Collections.emptyList())
+    } else {
+      createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions,
+        config.offsetsTopicReplicationFactor.toInt, 
coordinator.offsetsTopicConfigs)
+    }
   }
 
   private def getOrCreateGroupMetadataTopic(listenerName: ListenerName): 
MetadataResponse.TopicMetadata = {
@@ -800,7 +800,11 @@ class KafkaApis(val requestChannel: RequestChannel,
       val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
         if (topic == Topic.GroupMetadataTopicName) {
-          createGroupMetadataTopic()
+          val topicMetadata = createGroupMetadataTopic()
+          if (topicMetadata.error() == Errors.GROUP_COORDINATOR_NOT_AVAILABLE) 
{
+            new 
MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, 
Topic.isInternal(topic),
+              java.util.Collections.emptyList())
+          } else topicMetadata
         } else if (config.autoCreateTopicsEnable) {
           createTopic(topic, config.numPartitions, 
config.defaultReplicationFactor)
         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0f0b291..7946475 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -541,10 +541,7 @@ object KafkaConfig {
   val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry 
associated with an offset commit"
   val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets 
segments when loading offsets into the cache."
   val OffsetsTopicReplicationFactorDoc = "The replication factor for the 
offsets topic (set higher to ensure availability). " +
-  "To ensure that the effective replication factor of the offsets topic is the 
configured value, " +
-  "the number of alive brokers has to be at least the replication factor at 
the time of the " +
-  "first request for the offsets topic. If not, either the offsets topic 
creation will fail or " +
-  "it will get a replication factor of min(alive brokers, configured 
replication factor)"
+  "Internal topic creation will fail until the cluster size meets this 
replication factor requirement."
   val OffsetsTopicPartitionsDoc = "The number of partitions for the offset 
commit topic (should not change after deployment)"
   val OffsetsTopicSegmentBytesDoc = "The offsets topic segment bytes should be 
kept relatively small in order to facilitate faster log compaction and cache 
loads"
   val OffsetsTopicCompressionCodecDoc = "Compression codec for the offsets 
topic - compression may be used to achieve \"atomic\" commits"

http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 743756e..98daec1 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -217,6 +217,7 @@ object TestUtils extends Logging {
     props.put(KafkaConfig.DeleteTopicEnableProp, enableDeleteTopic.toString)
     props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "100")
     props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "2097152")
+    props.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
     rack.foreach(props.put(KafkaConfig.RackProp, _))
 
     if (protocolAndPorts.exists { case (protocol, _) => 
usesSslTransportLayer(protocol) })

http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ebc61db..da5c983 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -72,6 +72,7 @@ Kafka cluster before upgrading your clients. Version 0.10.2 
brokers support 0.8.
         should not be set in the Streams app any more. If the Kafka cluster is 
secured, Streams apps must have the required security privileges to create new 
topics.</li>
     <li>Several new fields including "security.protocol", 
"connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and 
"request.timeout.ms" were added to
         StreamsConfig class. User should pay attenntion to the default values 
and set these if needed. For more details please refer to <a 
id="streamsconfigs" href="#streamsconfigs">3.5 Kafka Streams Configs</a>.</li>
+    <li>The <code>offsets.topic.replication.factor</code> broker config is now 
enforced upon auto topic creation. Internal auto topic creation will fail with 
a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this 
replication factor requirement.</li>
 </ul>
 
 <h5><a id="upgrade_1020_new_protocols" href="#upgrade_1020_new_protocols">New 
Protocol Versions</a></h5>

http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index fe7bebc..656959a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -68,6 +68,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
         putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
         putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
+        putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1);
         putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
 
         for (int i = 0; i < brokers.length; i++) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties 
b/tests/kafkatest/services/kafka/templates/kafka.properties
index 7f3a920..c971715 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -63,3 +63,4 @@ replica.lag.time.max.ms={{replica_lag}}
 {% if auto_create_topics_enable is defined and auto_create_topics_enable is 
not none %}
 auto.create.topics.enable={{ auto_create_topics_enable }}
 {% endif %}
+offsets.topic.replication.factor={{ 3 if num_nodes > 3 else num_nodes }}
\ No newline at end of file

Reply via email to