Repository: kafka Updated Branches: refs/heads/0.8.2 133a4fb75 -> 4546b9dba
kafka-1555; provide strong consistency with reasonable availability; patched by Gwen Shapira; reviewed by Joel Koshy and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4546b9db Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4546b9db Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4546b9db Branch: refs/heads/0.8.2 Commit: 4546b9dba4814acb2e38cabd19dfd5c9b38524ee Parents: 133a4fb Author: Gwen Shapira <[email protected]> Authored: Mon Oct 13 16:48:26 2014 -0700 Committer: Jun Rao <[email protected]> Committed: Mon Oct 13 16:48:26 2014 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/ProducerConfig.java | 17 ++++-- .../apache/kafka/common/config/ConfigDef.java | 42 +++++++++++++ .../NotEnoughReplicasAfterAppendException.java | 43 ++++++++++++++ .../errors/NotEnoughReplicasException.java | 40 +++++++++++++ .../apache/kafka/common/protocol/Errors.java | 7 ++- .../main/scala/kafka/cluster/Partition.scala | 27 ++++++++- .../main/scala/kafka/common/ErrorMapping.scala | 20 ++++--- .../NotEnoughReplicasAfterAppendException.scala | 27 +++++++++ .../common/NotEnoughReplicasException.scala | 25 ++++++++ core/src/main/scala/kafka/log/LogConfig.scala | 62 +++++++++++++------- .../kafka/producer/SyncProducerConfig.scala | 14 +++-- .../src/main/scala/kafka/server/KafkaApis.scala | 6 +- .../main/scala/kafka/server/KafkaConfig.scala | 5 ++ .../kafka/api/ProducerFailureHandlingTest.scala | 57 +++++++++++++++++- .../unit/kafka/producer/ProducerTest.scala | 27 ++++----- .../unit/kafka/producer/SyncProducerTest.scala | 22 +++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 10 +++- 17 files changed, 384 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 9e2e280..bf4ed66 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -14,7 +14,10 @@ package org.apache.kafka.clients.producer; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.between; +import static org.apache.kafka.common.config.ConfigDef.ValidString.in; +import java.util.Arrays; +import java.util.List; import java.util.Map; import org.apache.kafka.common.config.AbstractConfig; @@ -77,7 +80,8 @@ public class ProducerConfig extends AbstractConfig { /** <code>acks</code> */ public static final String ACKS_CONFIG = "acks"; - private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " + " durability of records that are sent. The following settings are common: " + private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " + + " durability of records that are sent. The following settings are common: " + " <ul>" + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the" + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be" @@ -89,9 +93,7 @@ public class ProducerConfig extends AbstractConfig { + " acknowledging the record but before the followers have replicated it then the record will be lost." + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to" + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica" - + " remains alive. This is the strongest available guarantee." - + " <li>Other settings such as <code>acks=2</code> are also possible, and will require the given number of" - + " acknowledgements but this is generally less useful."; + + " remains alive. This is the strongest available guarantee."; /** <code>timeout.ms</code> */ public static final String TIMEOUT_CONFIG = "timeout.ms"; @@ -175,7 +177,12 @@ public class ProducerConfig extends AbstractConfig { config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) - .define(ACKS_CONFIG, Type.STRING, "1", Importance.HIGH, ACKS_DOC) + .define(ACKS_CONFIG, + Type.STRING, + "1", + in(Arrays.asList("all","-1", "0", "1")), + Importance.HIGH, + ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC) http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index addc906..227309e 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -268,6 +268,48 @@ public class ConfigDef { } } + public static class ValidString implements Validator { + List<String> validStrings; + + private ValidString(List<String> validStrings) { + this.validStrings = validStrings; + } + + public static ValidString in(List<String> validStrings) { + return new ValidString(validStrings); + } + + @Override + public void ensureValid(String name, Object o) { + + String s = (String) o; + + if (!validStrings.contains(s)) { + throw new ConfigException(name,o,"String must be one of:" +join(validStrings)); + } + + } + + public String toString() { + return "[" + join(validStrings) + "]"; + } + + private String join(List<String> list) + { + StringBuilder sb = new StringBuilder(); + boolean first = true; + for (String item : list) + { + if (first) + first = false; + else + sb.append(","); + sb.append(item); + } + return sb.toString(); + } + } + private static class ConfigKey { public final String name; public final Type type; http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java new file mode 100644 index 0000000..75c80a9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * Number of insync replicas for the partition is lower than min.insync.replicas + * This exception is raised when the low ISR size is discovered *after* the message + * was already appended to the log. Producer retries will cause duplicates. + */ +public class NotEnoughReplicasAfterAppendException extends RetriableException { + private static final long serialVersionUID = 1L; + + public NotEnoughReplicasAfterAppendException() { + super(); + } + + public NotEnoughReplicasAfterAppendException(String message, Throwable cause) { + super(message,cause); + } + + public NotEnoughReplicasAfterAppendException(String message) { + super(message); + } + + public NotEnoughReplicasAfterAppendException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java new file mode 100644 index 0000000..486d515 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * Number of insync replicas for the partition is lower than min.insync.replicas + */ +public class NotEnoughReplicasException extends RetriableException { + private static final long serialVersionUID = 1L; + + public NotEnoughReplicasException() { + super(); + } + + public NotEnoughReplicasException(String message, Throwable cause) { + super(message, cause); + } + + public NotEnoughReplicasException(String message) { + super(message); + } + + public NotEnoughReplicasException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index d5f5de3..3316b6a 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.errors.*; /** * This class contains all the client-server errors--those errors that must be sent from the server to the client. These * are thus part of the protocol. The names can be changed but the error code cannot. - * + * * Do not add exceptions that occur only on the client or only on the server here. */ public enum Errors { @@ -44,8 +44,9 @@ public enum Errors { NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")), // TODO: errorCode 14, 15, 16 INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")), - RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")); - + RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")), + NOT_ENOUGH_REPLICAS(19, new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")), + NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")); private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>(); private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>(); static { http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index ff106b4..e88ecf2 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -269,14 +269,26 @@ class Partition(val topic: String, else true /* also count the local (leader) replica */ }) + val minIsr = leaderReplica.log.get.config.minInSyncReplicas + trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId)) - if ((requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset) || - (requiredAcks > 0 && numAcks >= requiredAcks)) { + if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) { /* * requiredAcks < 0 means acknowledge after all replicas in ISR * are fully caught up to the (local) leader's offset * corresponding to this produce request. + * + * minIsr means that the topic is configured not to accept messages + * if there are not enough replicas in ISR + * in this scenario the request was already appended locally and + * then added to the purgatory before the ISR was shrunk */ + if (minIsr <= curInSyncReplicas.size) { + (true, ErrorMapping.NoError) + } else { + (true, ErrorMapping.NotEnoughReplicasAfterAppendCode) + } + } else if (requiredAcks > 0 && numAcks >= requiredAcks) { (true, ErrorMapping.NoError) } else (false, ErrorMapping.NoError) @@ -350,12 +362,21 @@ class Partition(val topic: String, stuckReplicas ++ slowReplicas } - def appendMessagesToLeader(messages: ByteBufferMessageSet) = { + def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = { inReadLock(leaderIsrUpdateLock) { val leaderReplicaOpt = leaderReplicaIfLocal() leaderReplicaOpt match { case Some(leaderReplica) => val log = leaderReplica.log.get + val minIsr = log.config.minInSyncReplicas + val inSyncSize = inSyncReplicas.size + + // Avoid writing to leader if there are not enough insync replicas to make it safe + if (inSyncSize < minIsr && requiredAcks == -1) { + throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]" + .format(topic,partitionId,minIsr,inSyncSize)) + } + val info = log.append(messages, assignOffsets = true) // probably unblock some follower fetch requests since log end offset has been updated replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId)) http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/core/src/main/scala/kafka/common/ErrorMapping.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index a190607..880ab4a 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import scala.Predef._ /** - * A bi-directional mapping between error codes and exceptions x + * A bi-directional mapping between error codes and exceptions */ object ErrorMapping { val EmptyByteBuffer = ByteBuffer.allocate(0) @@ -47,8 +47,10 @@ object ErrorMapping { val NotCoordinatorForConsumerCode: Short = 16 val InvalidTopicCode : Short = 17 val MessageSetSizeTooLargeCode: Short = 18 + val NotEnoughReplicasCode : Short = 19 + val NotEnoughReplicasAfterAppendCode: Short = 20 - private val exceptionToCode = + private val exceptionToCode = Map[Class[Throwable], Short]( classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode, classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode, @@ -66,15 +68,17 @@ object ErrorMapping { classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode, classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode, classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode, - classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode + classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode, + classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode, + classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode ).withDefaultValue(UnknownCode) - + /* invert the mapping */ - private val codeToException = + private val codeToException = (Map[Short, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException]) - + def codeFor(exception: Class[Throwable]): Short = exceptionToCode(exception) - + def maybeThrowException(code: Short) = if(code != 0) throw codeToException(code).newInstance() http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala b/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala new file mode 100644 index 0000000..c4f9def --- /dev/null +++ b/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Number of insync replicas for the partition is lower than min.insync.replicas + * This exception is raised when the low ISR size is discovered *after* the message + * was already appended to the log. Producer retries will cause duplicates. + */ +class NotEnoughReplicasAfterAppendException(message: String) extends RuntimeException(message) { + def this() = this(null) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala b/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala new file mode 100644 index 0000000..bfbe0ee --- /dev/null +++ b/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Message was rejected because number of insync replicas for the partition is lower than min.insync.replicas + */ +class NotEnoughReplicasException(message: String) extends RuntimeException(message) { + def this() = this(null) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/core/src/main/scala/kafka/log/LogConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 5746ad4..d2cc9e3 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -36,6 +36,7 @@ object Defaults { val MinCleanableDirtyRatio = 0.5 val Compact = false val UncleanLeaderElectionEnable = true + val MinInSyncReplicas = 1 } /** @@ -53,7 +54,9 @@ object Defaults { * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned * @param compact Should old segments in this log be deleted or deduplicated? * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property - * but included here for topic-specific configuration validation purposes + * but included here for topic-specific configuration validation purposes + * @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 (or all) required acks + * */ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val segmentMs: Long = Defaults.SegmentMs, @@ -68,8 +71,9 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val deleteRetentionMs: Long = Defaults.DeleteRetentionMs, val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, val compact: Boolean = Defaults.Compact, - val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable) { - + val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, + val minInSyncReplicas: Int = Defaults.MinInSyncReplicas) { + def toProps: Properties = { val props = new Properties() import LogConfig._ @@ -87,9 +91,9 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) props.put(CleanupPolicyProp, if(compact) "compact" else "delete") props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) + props.put(MinInSyncReplicasProp, minInSyncReplicas.toString) props } - } object LogConfig { @@ -107,13 +111,14 @@ object LogConfig { val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" val CleanupPolicyProp = "cleanup.policy" val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" - - val ConfigNames = Set(SegmentBytesProp, - SegmentMsProp, - SegmentIndexBytesProp, - FlushMessagesProp, - FlushMsProp, - RetentionBytesProp, + val MinInSyncReplicasProp = "min.insync.replicas" + + val ConfigNames = Set(SegmentBytesProp, + SegmentMsProp, + SegmentIndexBytesProp, + FlushMessagesProp, + FlushMsProp, + RetentionBytesProp, RententionMsProp, MaxMessageBytesProp, IndexIntervalBytesProp, @@ -121,9 +126,9 @@ object LogConfig { DeleteRetentionMsProp, MinCleanableDirtyRatioProp, CleanupPolicyProp, - UncleanLeaderElectionEnableProp) - - + UncleanLeaderElectionEnableProp, + MinInSyncReplicasProp) + /** * Parse the given properties instance into a LogConfig object */ @@ -144,9 +149,10 @@ object LogConfig { compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete") .trim.toLowerCase != "delete", uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp, - Defaults.UncleanLeaderElectionEnable.toString).toBoolean) + Defaults.UncleanLeaderElectionEnable.toString).toBoolean, + minInSyncReplicas = props.getProperty(MinInSyncReplicasProp,Defaults.MinInSyncReplicas.toString).toInt) } - + /** * Create a log config instance using the given properties and defaults */ @@ -155,7 +161,7 @@ object LogConfig { props.putAll(overrides) fromProps(props) } - + /** * Check that property names are valid */ @@ -164,15 +170,27 @@ object LogConfig { for(name <- props.keys) require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name)) } - + /** * Check that the given properties contain only valid log config names, and that all values can be parsed. */ def validate(props: Properties) { validateNames(props) + validateMinInSyncReplicas(props) LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values } - -} - - \ No newline at end of file + + /** + * Check that MinInSyncReplicas is reasonable + * Unfortunately, we can't validate its smaller than number of replicas + * since we don't have this information here + */ + private def validateMinInSyncReplicas(props: Properties) { + val minIsr = props.getProperty(MinInSyncReplicasProp) + if (minIsr != null && minIsr.toInt < 1) { + throw new InvalidConfigException("Wrong value " + minIsr + " of min.insync.replicas in topic configuration; " + + " Valid values are at least 1") + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/core/src/main/scala/kafka/producer/SyncProducerConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala index 69b2d0c..a08ce00 100644 --- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala @@ -42,11 +42,15 @@ trait SyncProducerConfigShared { val clientId = props.getString("client.id", SyncProducerConfig.DefaultClientId) /* - * The required acks of the producer requests - negative value means ack - * after the replicas in ISR have caught up to the leader's offset - * corresponding to this produce request. + * The number of acknowledgments the producer requires the leader to have received before considering a request complete. + * This controls the durability of the messages sent by the producer. + * + * request.required.acks = 0 - means the producer will not wait for any acknowledgement from the leader. + * request.required.acks = 1 - means the leader will write the message to its local log and immediately acknowledge + * request.required.acks = -1 - means the leader will wait for acknowledgement from all in-sync replicas before acknowledging the write */ - val requestRequiredAcks = props.getShort("request.required.acks", SyncProducerConfig.DefaultRequiredAcks) + + val requestRequiredAcks = props.getShortInRange("request.required.acks", SyncProducerConfig.DefaultRequiredAcks,(-1,1)) /* * The ack timeout of the producer requests. Value must be non-negative and non-zero @@ -59,4 +63,4 @@ object SyncProducerConfig { val DefaultClientId = "" val DefaultRequiredAcks : Short = 0 val DefaultAckTimeoutMs = 10000 -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c584b55..67f2833 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -248,7 +248,7 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val info = partitionOpt match { case Some(partition) => - partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) + partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks) case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" .format(topicAndPartition, brokerId)) } @@ -284,6 +284,10 @@ class KafkaApis(val requestChannel: RequestChannel, warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage)) new ProduceResult(topicAndPartition, nle) + case nere: NotEnoughReplicasException => + warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( + producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nere.getMessage)) + new ProduceResult(topicAndPartition, nere) case e: Throwable => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index cb87d3d..7fcbc16 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -199,6 +199,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* enable auto creation of topic on the server */ val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true) + /* define the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all) */ + val minInSyncReplicas = props.getIntInRange("min.insync.replicas",1,(1,Int.MaxValue)) + + + /*********** Replication configuration ***********/ /* the socket timeout for controller-to-broker channels */ http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 39f777b..209a409 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -18,12 +18,12 @@ package kafka.api import kafka.common.Topic -import org.apache.kafka.common.errors.InvalidTopicException +import org.apache.kafka.common.errors.{InvalidTopicException,NotEnoughReplicasException} import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ -import java.util.Random +import java.util.{Properties, Random} import java.lang.Integer import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException} @@ -302,6 +302,59 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes producer1.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get } + @Test + def testNotEnoughReplicas() { + val topicName = "minisrtest" + val topicProps = new Properties(); + topicProps.put("min.insync.replicas","3"); + + + TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps) + + + val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes) + try { + producer3.send(record).get + fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas") + } catch { + case e: ExecutionException => + if (!e.getCause.isInstanceOf[NotEnoughReplicasException]) { + fail("Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas") + } + } + } + + @Test + def testNotEnoughReplicasAfterBrokerShutdown() { + val topicName = "minisrtest2" + val topicProps = new Properties(); + topicProps.put("min.insync.replicas","2"); + + + TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps) + + + val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes) + // This should work + producer3.send(record).get + + //shut down one broker + servers.head.shutdown() + servers.head.awaitShutdown() + try { + producer3.send(record).get + fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas") + } catch { + case e: ExecutionException => + if (!e.getCause.isInstanceOf[NotEnoughReplicasException]) { + fail("Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas") + } + } + + servers.head.startup() + + } + private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) { val numRecords = 1000 http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index dd71d81..ce65dab 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -17,6 +17,7 @@ package kafka.producer +import org.apache.kafka.common.config.ConfigException import org.scalatest.TestFailedException import org.scalatest.junit.JUnit3Suite import kafka.consumer.SimpleConsumer @@ -143,7 +144,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ @Test def testSendToNewTopic() { val props1 = new util.Properties() - props1.put("request.required.acks", "2") + props1.put("request.required.acks", "-1") val topic = "new-topic" // create topic with 1 partition and await leadership @@ -181,24 +182,20 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // no need to retry since the send will always fail props2.put("message.send.max.retries", "0") - val producer2 = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[StringEncoder].getName, - partitioner = classOf[StaticPartitioner].getName, - producerProps = props2) - try { - producer2.send(new KeyedMessage[String, String](topic, "test", "test2")) - fail("Should have timed out for 3 acks.") + val producer2 = TestUtils.createProducer[String, String]( + brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[StaticPartitioner].getName, + producerProps = props2) + producer2.close + fail("we don't support request.required.acks greater than 1") } catch { - case se: FailedToSendMessageException => - // this is expected + case iae: IllegalArgumentException => // this is expected case e: Throwable => fail("Not expected", e) - } - finally { - producer2.close() + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 24deea0..fb61d55 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -18,6 +18,7 @@ package kafka.producer import java.net.SocketTimeoutException +import java.util.Properties import junit.framework.Assert import kafka.admin.AdminUtils import kafka.integration.KafkaServerTestHarness @@ -113,6 +114,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset) } + @Test def testMessageSizeTooLargeWithAckZero() { val server = servers.head @@ -225,4 +227,24 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val response = producer.send(emptyRequest) Assert.assertTrue(response == null) } + + @Test + def testNotEnoughReplicas() { + val topicName = "minisrtest" + val server = servers.head + + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + props.put("request.required.acks", "-1") + + val producer = new SyncProducer(new SyncProducerConfig(props)) + val topicProps = new Properties(); + topicProps.put("min.insync.replicas","2"); + AdminUtils.createTopic(zkClient, topicName, 1, 1,topicProps) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicName, 0) + + val response = producer.send(TestUtils.produceRequest(topicName, 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1)) + + Assert.assertEquals(ErrorMapping.NotEnoughReplicasCode, response.status(TopicAndPartition(topicName, 0)).error) + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4546b9db/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 2dbdd3c..dd3640f 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -168,10 +168,14 @@ object TestUtils extends Logging { * Wait until the leader is elected and the metadata is propagated to all brokers. * Return the leader for each partition. */ - def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, - servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { + def createTopic(zkClient: ZkClient, + topic: String, + numPartitions: Int = 1, + replicationFactor: Int = 1, + servers: Seq[KafkaServer], + topicConfig: Properties = new Properties) : scala.collection.immutable.Map[Int, Option[Int]] = { // create topic - AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor) + AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, topicConfig) // wait until the update metadata request for new topic reaches all servers (0 until numPartitions).map { case i => TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
