kafka-1035; Add message-send-max-retries and retry-backoff-ms options to console producer; patched by Rajasekar Elango; reviewed by Guaozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da451217 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da451217 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da451217 Branch: refs/heads/trunk Commit: da4512174b6f7c395ffe053a86d2c6bb19d2538a Parents: 20953b5 Author: Rajasekar Elango <[email protected]> Authored: Thu Sep 5 07:45:01 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Thu Sep 5 07:45:01 2013 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/producer/ConsoleProducer.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/da451217/core/src/main/scala/kafka/producer/ConsoleProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 5539bce..00cb2e8 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -44,6 +44,14 @@ object ConsoleProducer { .describedAs("size") .ofType(classOf[java.lang.Integer]) .defaultsTo(200) + val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.") + .withRequiredArg + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3) + val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.") + .withRequiredArg + .ofType(classOf[java.lang.Long]) + .defaultsTo(100) val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + " a message will queue awaiting suffient batch size. The value is given in ms.") .withRequiredArg @@ -97,7 +105,7 @@ object ConsoleProducer { .withRequiredArg .describedAs("prop") .ofType(classOf[String]) - + val options = parser.parse(args : _*) for(arg <- List(topicOpt, brokerListOpt)) { @@ -132,6 +140,9 @@ object ConsoleProducer { props.put("producer.type", if(sync) "sync" else "async") if(options.has(batchSizeOpt)) props.put("batch.num.messages", batchSize.toString) + + props.put("message.send.max.retries", options.valueOf(messageSendMaxRetriesOpt).toString) + props.put("retry.backoff.ms", options.valueOf(retryBackoffMsOpt).toString) props.put("queue.buffering.max.ms", sendTimeout.toString) props.put("queue.buffering.max.messages", queueSize.toString) props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)
