kafka git commit: KAFKA-3697; Clean up website documentation of client usage
Repository: kafka Updated Branches: refs/heads/0.10.1 2a059fa68 -> 7de050ee5 KAFKA-3697; Clean up website documentation of client usage This is to imply that the Java consumer/producer are the recommended consumer/producer now. Author: Vahid HashemianReviewers: Jason Gustafson Closes #1921 from vahidhashemian/KAFKA-3697 (cherry picked from commit d2a267b111d23d6b98f2784382095b9ae5ddf886) Signed-off-by: Jason Gustafson Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7de050ee Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7de050ee Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7de050ee Branch: refs/heads/0.10.1 Commit: 7de050ee50e78992706e0c7f72a80a422650c23a Parents: 2a059fa Author: Vahid Hashemian Authored: Thu Sep 29 19:37:20 2016 -0700 Committer: Jason Gustafson Committed: Thu Sep 29 19:37:37 2016 -0700 -- docs/configuration.html | 13 +++-- docs/documentation.html | 4 ++-- docs/implementation.html | 4 ++-- docs/ops.html| 8 docs/quickstart.html | 10 +- docs/security.html | 2 +- docs/upgrade.html| 2 +- 7 files changed, 22 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/7de050ee/docs/configuration.html -- diff --git a/docs/configuration.html b/docs/configuration.html index 5428691..35f1475 100644 --- a/docs/configuration.html +++ b/docs/configuration.html @@ -70,9 +70,14 @@ Below is the configuration of the Java producer: 3.3 Consumer Configs -We introduce both the old 0.8 consumer configs and the new consumer configs respectively below. +In 0.9.0.0 we introduced the new Java consumer as a replacement for the older Scala-based simple and high-level consumers. +The configs for both new and old consumers are described below. -3.3.1 Old Consumer Configs +3.3.1 New Consumer Configs +Below is the configuration for the new consumer: + + +3.3.2 Old Consumer Configs The essential old consumer configurations are the following: @@ -239,10 +244,6 @@ The essential old consumer configurations are the following: More details about consumer configuration can be found in the scala class kafka.consumer.ConsumerConfig. -3.3.2 New Consumer Configs -Since 0.9.0.0 we have been working on a replacement for our existing simple and high-level consumers. The code is considered beta quality. Below is the configuration for the new consumer: - - 3.4 Kafka Connect Configs Below is the configuration of the Kafka Connect framework. http://git-wip-us.apache.org/repos/asf/kafka/blob/7de050ee/docs/documentation.html -- diff --git a/docs/documentation.html b/docs/documentation.html index 95d1251..e96fe16 100644 --- a/docs/documentation.html +++ b/docs/documentation.html @@ -49,8 +49,8 @@ Prior releases: 0.7.x, 3.2 Producer Configs 3.3 Consumer Configs -3.3.1 Old Consumer Configs -3.3.2 New Consumer Configs +3.3.1 New Consumer Configs +3.3.2 Old Consumer Configs 3.4 Kafka Connect Configs 3.5 Kafka Streams Configs http://git-wip-us.apache.org/repos/asf/kafka/blob/7de050ee/docs/implementation.html -- diff --git a/docs/implementation.html b/docs/implementation.html index 91e17a6..12846fb 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -40,9 +40,9 @@ class Producer { The goal is to expose all the producer functionality through a single API to the client. -The new producer - +The Kafka producer -can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data - +can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data: kafka.producer.Producer provides the ability to batch multiple produce requests (producer.type=async), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either queue.time or batch.size is reached. A background thread (kafka.producer.async.ProducerSendThread) dequeues the batch of data and lets the kafka.producer.EventHandler serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the event.handler config parameter. At various
kafka git commit: KAFKA-3697; Clean up website documentation of client usage
Repository: kafka Updated Branches: refs/heads/trunk 7c6d70655 -> d2a267b11 KAFKA-3697; Clean up website documentation of client usage This is to imply that the Java consumer/producer are the recommended consumer/producer now. Author: Vahid HashemianReviewers: Jason Gustafson Closes #1921 from vahidhashemian/KAFKA-3697 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2a267b1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2a267b1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2a267b1 Branch: refs/heads/trunk Commit: d2a267b111d23d6b98f2784382095b9ae5ddf886 Parents: 7c6d706 Author: Vahid Hashemian Authored: Thu Sep 29 19:37:20 2016 -0700 Committer: Jason Gustafson Committed: Thu Sep 29 19:37:20 2016 -0700 -- docs/configuration.html | 13 +++-- docs/documentation.html | 4 ++-- docs/implementation.html | 4 ++-- docs/ops.html| 8 docs/quickstart.html | 10 +- docs/security.html | 2 +- docs/upgrade.html| 2 +- 7 files changed, 22 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/d2a267b1/docs/configuration.html -- diff --git a/docs/configuration.html b/docs/configuration.html index 5428691..35f1475 100644 --- a/docs/configuration.html +++ b/docs/configuration.html @@ -70,9 +70,14 @@ Below is the configuration of the Java producer: 3.3 Consumer Configs -We introduce both the old 0.8 consumer configs and the new consumer configs respectively below. +In 0.9.0.0 we introduced the new Java consumer as a replacement for the older Scala-based simple and high-level consumers. +The configs for both new and old consumers are described below. -3.3.1 Old Consumer Configs +3.3.1 New Consumer Configs +Below is the configuration for the new consumer: + + +3.3.2 Old Consumer Configs The essential old consumer configurations are the following: @@ -239,10 +244,6 @@ The essential old consumer configurations are the following: More details about consumer configuration can be found in the scala class kafka.consumer.ConsumerConfig. -3.3.2 New Consumer Configs -Since 0.9.0.0 we have been working on a replacement for our existing simple and high-level consumers. The code is considered beta quality. Below is the configuration for the new consumer: - - 3.4 Kafka Connect Configs Below is the configuration of the Kafka Connect framework. http://git-wip-us.apache.org/repos/asf/kafka/blob/d2a267b1/docs/documentation.html -- diff --git a/docs/documentation.html b/docs/documentation.html index f4f1ddc..07ffe84 100644 --- a/docs/documentation.html +++ b/docs/documentation.html @@ -46,8 +46,8 @@ Prior releases: 0.7.x, 3.2 Producer Configs 3.3 Consumer Configs -3.3.1 Old Consumer Configs -3.3.2 New Consumer Configs +3.3.1 New Consumer Configs +3.3.2 Old Consumer Configs 3.4 Kafka Connect Configs 3.5 Kafka Streams Configs http://git-wip-us.apache.org/repos/asf/kafka/blob/d2a267b1/docs/implementation.html -- diff --git a/docs/implementation.html b/docs/implementation.html index 91e17a6..12846fb 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -40,9 +40,9 @@ class Producer { The goal is to expose all the producer functionality through a single API to the client. -The new producer - +The Kafka producer -can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data - +can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data: kafka.producer.Producer provides the ability to batch multiple produce requests (producer.type=async), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either queue.time or batch.size is reached. A background thread (kafka.producer.async.ProducerSendThread) dequeues the batch of data and lets the kafka.producer.EventHandler serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the event.handler config parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom
kafka git commit: KAFKA-3965; MirrorMaker should not commit offset when exception is thrown from producer.send
Repository: kafka Updated Branches: refs/heads/trunk 0c8c167e8 -> 7c6d70655 KAFKA-3965; MirrorMaker should not commit offset when exception is thrown from producer.send Author: Jiangjie QinReviewers: Manikumar Reddy , Ismael Juma , Jason Gustafson Closes #1915 from becketqin/KAFKA-3965 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c6d7065 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c6d7065 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c6d7065 Branch: refs/heads/trunk Commit: 7c6d70655a7de6a3b688a3e869a091f863de2eef Parents: 0c8c167 Author: Jiangjie Qin Authored: Thu Sep 29 16:36:42 2016 -0700 Committer: Jason Gustafson Committed: Thu Sep 29 16:36:42 2016 -0700 -- core/src/main/scala/kafka/tools/MirrorMaker.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/7c6d7065/core/src/main/scala/kafka/tools/MirrorMaker.scala -- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 979203c..17b8f0b 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -444,6 +444,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } catch { case t: Throwable => + exitingOnSendFailure = true fatal("Mirror maker thread failure due to ", t) } finally { CoreUtils.swallow {
kafka git commit: KAFKA-3965; MirrorMaker should not commit offset when exception is thrown from producer.send
Repository: kafka Updated Branches: refs/heads/0.10.1 472b3a558 -> 2a059fa68 KAFKA-3965; MirrorMaker should not commit offset when exception is thrown from producer.send Author: Jiangjie QinReviewers: Manikumar Reddy , Ismael Juma , Jason Gustafson Closes #1915 from becketqin/KAFKA-3965 (cherry picked from commit 7c6d70655a7de6a3b688a3e869a091f863de2eef) Signed-off-by: Jason Gustafson Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a059fa6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a059fa6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a059fa6 Branch: refs/heads/0.10.1 Commit: 2a059fa6882998972f2fe8d21411582c6d999c63 Parents: 472b3a5 Author: Jiangjie Qin Authored: Thu Sep 29 16:36:42 2016 -0700 Committer: Jason Gustafson Committed: Thu Sep 29 16:39:25 2016 -0700 -- core/src/main/scala/kafka/tools/MirrorMaker.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/2a059fa6/core/src/main/scala/kafka/tools/MirrorMaker.scala -- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 979203c..17b8f0b 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -444,6 +444,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } catch { case t: Throwable => + exitingOnSendFailure = true fatal("Mirror maker thread failure due to ", t) } finally { CoreUtils.swallow {
kafka git commit: MINOR: Allow for asynchronous start of producer consumer in validation test
Repository: kafka Updated Branches: refs/heads/0.10.1 18e05117a -> 472b3a558 MINOR: Allow for asynchronous start of producer consumer in validation test Author: Konstantine KarantasisReviewers: Jason Gustafson Closes #1909 from kkonstantine/MINOR-Async-start-in-produce-consume-validate (cherry picked from commit 0c8c167e844e53f64df80fca52b9b9b1d3dba8b7) Signed-off-by: Jason Gustafson Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/472b3a55 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/472b3a55 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/472b3a55 Branch: refs/heads/0.10.1 Commit: 472b3a5584c3486e2f351ff330cd4e319d3f144f Parents: 18e0511 Author: Konstantine Karantasis Authored: Thu Sep 29 13:39:01 2016 -0700 Committer: Jason Gustafson Committed: Thu Sep 29 13:53:04 2016 -0700 -- .../kafkatest/services/kafka/templates/kafka.properties | 4 +++- .../services/templates/console_consumer.properties | 3 +++ tests/kafkatest/tests/produce_consume_validate.py| 11 ++- 3 files changed, 12 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/472b3a55/tests/kafkatest/services/kafka/templates/kafka.properties -- diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 06ec603..7f3a920 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -60,4 +60,6 @@ zookeeper.session.timeout.ms={{ zk_session_timeout }} replica.lag.time.max.ms={{replica_lag}} {% endif %} - +{% if auto_create_topics_enable is defined and auto_create_topics_enable is not none %} +auto.create.topics.enable={{ auto_create_topics_enable }} +{% endif %} http://git-wip-us.apache.org/repos/asf/kafka/blob/472b3a55/tests/kafkatest/services/templates/console_consumer.properties -- diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties index 4bac01f..40ed2f3 100644 --- a/tests/kafkatest/services/templates/console_consumer.properties +++ b/tests/kafkatest/services/templates/console_consumer.properties @@ -19,3 +19,6 @@ group.id={{ group_id|default('test-consumer-group') }} client.id={{ client_id }} {% endif %} +{% if consumer_metadata_max_age_ms is defined and consumer_metadata_max_age_ms is not none %} +metadata.max.age.ms={{ consumer_metadata_max_age_ms }} +{% endif %} http://git-wip-us.apache.org/repos/asf/kafka/blob/472b3a55/tests/kafkatest/tests/produce_consume_validate.py -- diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py index a5da7be..be7cda4 100644 --- a/tests/kafkatest/tests/produce_consume_validate.py +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -32,13 +32,13 @@ class ProduceConsumeValidateTest(Test): def setup_producer_and_consumer(self): raise NotImplementedError("Subclasses should implement this") -def start_producer_and_consumer(self): +def start_producer_and_consumer(self, async=False): # Start background producer and consumer self.producer.start() -wait_until(lambda: self.producer.num_acked > 5, timeout_sec=20, +wait_until(lambda: async or self.producer.num_acked > 5, timeout_sec=20, err_msg="Producer failed to start in a reasonable amount of time.") self.consumer.start() -wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=60, +wait_until(lambda: async or len(self.consumer.messages_consumed[1]) > 0, timeout_sec=60, err_msg="Consumer failed to start in a reasonable amount of time.") def check_alive(self): @@ -63,10 +63,10 @@ class ProduceConsumeValidateTest(Test): self.producer.stop() self.consumer.wait() -def run_produce_consume_validate(self, core_test_action=None, *args): +def run_produce_consume_validate(self, async=False, core_test_action=None, *args): """Top-level template for simple produce/consume/validate tests.""" try: -self.start_producer_and_consumer() +self.start_producer_and_consumer(async) if core_test_action is not None: core_test_action(*args) @@ -108,6 +108,7 @@ class ProduceConsumeValidateTest(Test): msg = "" acked =
kafka git commit: MINOR: Allow for asynchronous start of producer consumer in validation test
Repository: kafka Updated Branches: refs/heads/trunk 2ca9177f4 -> 0c8c167e8 MINOR: Allow for asynchronous start of producer consumer in validation test Author: Konstantine KarantasisReviewers: Jason Gustafson Closes #1909 from kkonstantine/MINOR-Async-start-in-produce-consume-validate Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0c8c167e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0c8c167e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0c8c167e Branch: refs/heads/trunk Commit: 0c8c167e844e53f64df80fca52b9b9b1d3dba8b7 Parents: 2ca9177 Author: Konstantine Karantasis Authored: Thu Sep 29 13:39:01 2016 -0700 Committer: Jason Gustafson Committed: Thu Sep 29 13:50:39 2016 -0700 -- .../kafkatest/services/kafka/templates/kafka.properties | 4 +++- .../services/templates/console_consumer.properties | 3 +++ tests/kafkatest/tests/produce_consume_validate.py| 11 ++- 3 files changed, 12 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/0c8c167e/tests/kafkatest/services/kafka/templates/kafka.properties -- diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 06ec603..7f3a920 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -60,4 +60,6 @@ zookeeper.session.timeout.ms={{ zk_session_timeout }} replica.lag.time.max.ms={{replica_lag}} {% endif %} - +{% if auto_create_topics_enable is defined and auto_create_topics_enable is not none %} +auto.create.topics.enable={{ auto_create_topics_enable }} +{% endif %} http://git-wip-us.apache.org/repos/asf/kafka/blob/0c8c167e/tests/kafkatest/services/templates/console_consumer.properties -- diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties index 4bac01f..40ed2f3 100644 --- a/tests/kafkatest/services/templates/console_consumer.properties +++ b/tests/kafkatest/services/templates/console_consumer.properties @@ -19,3 +19,6 @@ group.id={{ group_id|default('test-consumer-group') }} client.id={{ client_id }} {% endif %} +{% if consumer_metadata_max_age_ms is defined and consumer_metadata_max_age_ms is not none %} +metadata.max.age.ms={{ consumer_metadata_max_age_ms }} +{% endif %} http://git-wip-us.apache.org/repos/asf/kafka/blob/0c8c167e/tests/kafkatest/tests/produce_consume_validate.py -- diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py index a5da7be..be7cda4 100644 --- a/tests/kafkatest/tests/produce_consume_validate.py +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -32,13 +32,13 @@ class ProduceConsumeValidateTest(Test): def setup_producer_and_consumer(self): raise NotImplementedError("Subclasses should implement this") -def start_producer_and_consumer(self): +def start_producer_and_consumer(self, async=False): # Start background producer and consumer self.producer.start() -wait_until(lambda: self.producer.num_acked > 5, timeout_sec=20, +wait_until(lambda: async or self.producer.num_acked > 5, timeout_sec=20, err_msg="Producer failed to start in a reasonable amount of time.") self.consumer.start() -wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=60, +wait_until(lambda: async or len(self.consumer.messages_consumed[1]) > 0, timeout_sec=60, err_msg="Consumer failed to start in a reasonable amount of time.") def check_alive(self): @@ -63,10 +63,10 @@ class ProduceConsumeValidateTest(Test): self.producer.stop() self.consumer.wait() -def run_produce_consume_validate(self, core_test_action=None, *args): +def run_produce_consume_validate(self, async=False, core_test_action=None, *args): """Top-level template for simple produce/consume/validate tests.""" try: -self.start_producer_and_consumer() +self.start_producer_and_consumer(async) if core_test_action is not None: core_test_action(*args) @@ -108,6 +108,7 @@ class ProduceConsumeValidateTest(Test): msg = "" acked = self.producer.acked consumed = self.consumer.messages_consumed[1] +# Correctness of the set difference
kafka-site git commit: Fix another typo on the homepage.
Repository: kafka-site Updated Branches: refs/heads/asf-site da5820751 -> 0ba7502a5 Fix another typo on the homepage. Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/0ba7502a Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/0ba7502a Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/0ba7502a Branch: refs/heads/asf-site Commit: 0ba7502a5838b3bb6aadd92e99ac8d71dd0b93ef Parents: da58207 Author: Jay KrepsAuthored: Thu Sep 29 09:39:48 2016 -0700 Committer: Jay Kreps Committed: Thu Sep 29 09:39:48 2016 -0700 -- index.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka-site/blob/0ba7502a/index.html -- diff --git a/index.html b/index.html index f09b880..7ffc4d7 100644 --- a/index.html +++ b/index.html @@ -4,7 +4,7 @@ -Apache Kafka is a distibuted streaming platform. It lets you... +Apache Kafka is a distributed streaming platform. It lets you... Publish and subscribe to streams of data like a messaging system Store streams of data in a distributed, replicated cluster
kafka git commit: KAFKA-4216; Control Leader & Follower Throttled Replicas Separately
Repository: kafka Updated Branches: refs/heads/0.10.1 f2066 -> 18e05117a KAFKA-4216; Control Leader & Follower Throttled Replicas Separately Splits the throttled replica configuration (the list of which replicas should be throttled for each topic) into two. One for the leader throttle, one for the follower throttle. So: quota.replication.throttled.replicas => quota.leader.replication.throttled.replicas & quota.follower.replication.throttled.replicas Author: Ben StopfordReviewers: Jun Rao , Ismael Juma Closes #1906 from benstopford/KAFKA-4216-seperate-leader-and-follower-throttled-replica-lists (cherry picked from commit 2ca9177f499a07262db0072252d3ca2dfb58) Signed-off-by: Ismael Juma Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/18e05117 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/18e05117 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/18e05117 Branch: refs/heads/0.10.1 Commit: 18e05117a320fc2a2c6e63a2da29d565c72fb173 Parents: f20 Author: Ben Stopford Authored: Thu Sep 29 07:55:05 2016 +0100 Committer: Ismael Juma Committed: Thu Sep 29 08:57:38 2016 +0100 -- .../kafka/admin/ReassignPartitionsCommand.scala | 104 +-- core/src/main/scala/kafka/log/LogConfig.scala | 25 ++-- .../main/scala/kafka/server/ConfigHandler.scala | 52 core/src/main/scala/kafka/utils/CoreUtils.scala | 12 +- .../test/scala/unit/kafka/admin/AdminTest.scala | 53 +--- .../admin/ReassignPartitionsClusterTest.scala | 30 - .../admin/ReassignPartitionsCommandTest.scala | 128 +-- .../kafka/admin/ReplicationQuotaUtils.scala | 11 +- .../scala/unit/kafka/log/LogConfigTest.scala| 13 +- .../kafka/server/DynamicConfigChangeTest.scala | 28 +++- .../unit/kafka/server/DynamicConfigTest.scala | 10 +- .../kafka/server/ReplicationQuotasTest.scala| 14 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 7 - 13 files changed, 315 insertions(+), 172 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala -- diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 2fa75f6..dccc37c 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -16,14 +16,15 @@ */ package kafka.admin -import java.util.Properties import joptsimple.OptionParser -import kafka.log.LogConfig import kafka.server.{DynamicConfig, ConfigType} import kafka.utils._ import scala.collection._ import org.I0Itec.zkclient.exception.ZkNodeExistsException -import kafka.common.{TopicAndPartition, AdminCommandFailedException} +import kafka.common.{AdminCommandFailedException, TopicAndPartition} +import kafka.log.LogConfig +import kafka.log.LogConfig._ +import kafka.utils.CoreUtils._ import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.security.JaasUtils @@ -74,7 +75,7 @@ object ReassignPartitionsCommand extends Logging { removeThrottle(zkUtils, partitionsToBeReassigned, reassignedPartitionsStatus) } - private def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, scala.Seq[Int]], reassignedPartitionsStatus: Map[TopicAndPartition, ReassignmentStatus]): Unit = { + private def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], reassignedPartitionsStatus: Map[TopicAndPartition, ReassignmentStatus]): Unit = { var changed = false //If all partitions have completed remove the throttle @@ -92,7 +93,8 @@ object ReassignPartitionsCommand extends Logging { val topics = partitionsToBeReassigned.keySet.map(tp => tp.topic).toSeq.distinct for (topic <- topics) { val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic) -if (configs.remove(LogConfig.ThrottledReplicasListProp) != null) { +if (configs.remove(LogConfig.LeaderThrottledReplicasListProp) != null + || configs.remove(LogConfig.FollowerThrottledReplicasListProp) != null){ AdminUtils.changeTopicConfig(zkUtils, topic, configs) changed = true } @@ -145,7 +147,7 @@ object ReassignPartitionsCommand extends Logging { } def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Long = -1) { -val partitionsToBeReassigned = parseAndValidate(reassignmentJsonString) +val partitionsToBeReassigned =
kafka git commit: HOTFIX: Tools for releases prior to 0.10.1 need --new-consumer flag
Repository: kafka Updated Branches: refs/heads/trunk 67e99d086 -> a6f3cf56b HOTFIX: Tools for releases prior to 0.10.1 need --new-consumer flag Author: Jason GustafsonReviewers: Ismael Juma Closes #1931 from hachikuji/fix-broken-upgrade-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a6f3cf56 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a6f3cf56 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a6f3cf56 Branch: refs/heads/trunk Commit: a6f3cf56b3ce5c97e89f4b80c34d54ea608950ee Parents: 67e99d0 Author: Jason Gustafson Authored: Thu Sep 29 07:48:40 2016 +0100 Committer: Ismael Juma Committed: Thu Sep 29 07:48:40 2016 +0100 -- tests/kafkatest/services/console_consumer.py| 4 +++- .../kafkatest/services/performance/consumer_performance.py | 9 + 2 files changed, 8 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/a6f3cf56/tests/kafkatest/services/console_consumer.py -- diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 237d028..7978963 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -22,7 +22,7 @@ from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin -from kafkatest.version import TRUNK, LATEST_0_8_2, LATEST_0_9, V_0_10_0_0 +from kafkatest.version import TRUNK, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0 """ 0.8.2.1 ConsoleConsumer options @@ -176,6 +176,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) "--topic %(topic)s --consumer.config %(config_file)s" % args if self.new_consumer: +if node.version <= LATEST_0_10_0: +cmd += " --new-consumer" cmd += " --bootstrap-server %(broker_list)s" % args else: cmd += " --zookeeper %(zk_connect)s" % args http://git-wip-us.apache.org/repos/asf/kafka/blob/a6f3cf56/tests/kafkatest/services/performance/consumer_performance.py -- diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index cb661e3..a17545d 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -18,7 +18,7 @@ import os from kafkatest.services.performance import PerformanceService from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.version import TRUNK, V_0_9_0_0 +from kafkatest.version import TRUNK, V_0_9_0_0, LATEST_0_10_0 class ConsumerPerformanceService(PerformanceService): @@ -97,8 +97,7 @@ class ConsumerPerformanceService(PerformanceService): for node in self.nodes: node.version = version -@property -def args(self): +def args(self, version): """Dictionary of arguments used to start the Consumer Performance script.""" args = { 'topic': self.topic, @@ -106,6 +105,8 @@ class ConsumerPerformanceService(PerformanceService): } if self.new_consumer: +if version <= LATEST_0_10_0: +args['new-consumer'] = "" args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) else: args['zookeeper'] = self.kafka.zk.connect_setting() @@ -135,7 +136,7 @@ class ConsumerPerformanceService(PerformanceService): cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG cmd += " %s" % self.path.script("kafka-consumer-perf-test.sh", node) -for key, value in self.args.items(): +for key, value in self.args(node.version).items(): cmd += " --%s %s" % (key, value) if node.version >= V_0_9_0_0: