Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk
Conflicts:
core/src/main/scala/kafka/admin/AdminUtils.scala
core/src/main/scala/kafka/admin/TopicCommand.scala
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/793b6076
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/793b6076
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/793b6076
Branch: refs/heads/trunk
Commit: 793b60763c0bb073dd5cacf3117b34877f557fb8
Parents: 7c920e9 eedbea6
Author: Sriram Subramanian <[email protected]>
Authored: Mon Nov 18 03:15:27 2013 -0800
Committer: Sriram Subramanian <[email protected]>
Committed: Mon Nov 18 03:15:27 2013 -0800
----------------------------------------------------------------------
DISCLAIMER | 15 --
bin/kafka-add-partitions.sh | 18 --
bin/kafka-console-consumer.sh | 2 +-
bin/kafka-console-producer.sh | 2 +-
bin/kafka-consumer-perf-test.sh | 2 +-
bin/kafka-preferred-replica-election.sh | 2 +-
bin/kafka-producer-perf-test.sh | 2 +-
bin/kafka-reassign-partitions.sh | 2 +-
bin/kafka-replay-log-producer.sh | 2 +-
bin/kafka-run-class.sh | 18 +-
bin/kafka-server-start.sh | 2 +-
bin/kafka-simple-consumer-perf-test.sh | 2 +-
bin/kafka-simple-consumer-shell.sh | 2 +-
bin/kafka-topics.sh | 2 +-
bin/zookeeper-server-start.sh | 2 +-
bin/zookeeper-shell.sh | 2 +-
config/server.properties | 15 +-
.../src/main/scala/kafka/admin/AdminUtils.scala | 30 ++-
.../kafka/admin/ReassignPartitionsCommand.scala | 14 +-
.../main/scala/kafka/admin/TopicCommand.scala | 47 +++-
.../main/scala/kafka/cluster/Partition.scala | 97 +++----
.../main/scala/kafka/common/ErrorMapping.scala | 1 +
.../common/NotAssignedReplicaException.scala | 23 ++
.../scala/kafka/common/TopicAndPartition.scala | 6 +
.../kafka/consumer/ConsumerFetcherManager.scala | 47 ++--
.../kafka/consumer/ConsumerFetcherThread.scala | 2 +-
.../consumer/ZookeeperConsumerConnector.scala | 6 +-
.../kafka/controller/KafkaController.scala | 127 +++++++--
.../kafka/controller/ReplicaStateMachine.scala | 2 +
core/src/main/scala/kafka/log/Log.scala | 34 ++-
core/src/main/scala/kafka/log/LogConfig.scala | 2 +-
core/src/main/scala/kafka/log/LogManager.scala | 22 +-
core/src/main/scala/kafka/log/LogSegment.scala | 4 +-
core/src/main/scala/kafka/log/OffsetIndex.scala | 111 +++++---
.../scala/kafka/network/RequestChannel.scala | 29 ++-
.../kafka/server/AbstractFetcherManager.scala | 48 ++--
.../kafka/server/AbstractFetcherThread.scala | 38 +--
.../src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../main/scala/kafka/server/KafkaConfig.scala | 26 +-
.../scala/kafka/server/KafkaHealthcheck.scala | 14 +-
.../kafka/server/KafkaRequestHandler.scala | 2 +-
.../main/scala/kafka/server/KafkaServer.scala | 4 +-
.../scala/kafka/server/OffsetCheckpoint.scala | 16 +-
.../scala/kafka/server/ReplicaManager.scala | 259 +++++++++++++------
.../scala/kafka/server/TopicConfigManager.scala | 1 -
.../scala/kafka/tools/DumpLogSegments.scala | 12 +-
.../main/scala/kafka/tools/GetOffsetShell.scala | 71 +++--
core/src/main/scala/kafka/utils/Os.scala | 23 ++
core/src/main/scala/kafka/utils/Utils.scala | 13 +
.../kafka/utils/VerifiableProperties.scala | 2 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 12 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 42 ++-
.../unit/kafka/network/SocketServerTest.scala | 18 +-
.../unit/kafka/producer/ProducerTest.scala | 12 +-
.../unit/kafka/server/AdvertiseBrokerTest.scala | 52 ++++
.../unit/kafka/server/KafkaConfigTest.scala | 97 +++++++
.../test/scala/unit/kafka/utils/UtilsTest.scala | 14 +
kafka-patch-review.py | 13 +-
project/Build.scala | 3 +-
.../0.7/bin/kafka-run-class.sh | 8 +-
.../0.7/config/test-log4j.properties | 68 +++++
system_test/utils/kafka_system_test_utils.py | 1 +
62 files changed, 1106 insertions(+), 461 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/793b6076/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/admin/AdminUtils.scala
index 9a8b0c9,8ff4bd5..05fb5b5
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@@ -33,7 -33,6 +33,10 @@@ import scala.Predef.
import collection.Map
import scala.Some
import collection.Set
++<<<<<<< HEAD
+import kafka.common.TopicAndPartition
++=======
++>>>>>>> eedbea6526986783257ad0e025c451a8ee3d9095
object AdminUtils extends Logging {
val rand = new Random
http://git-wip-us.apache.org/repos/asf/kafka/blob/793b6076/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/admin/TopicCommand.scala
index 56f3177,3c08dee..c3eecdd
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@@ -79,8 -79,13 +79,18 @@@ object TopicCommand
def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options,
opts.topicOpt)
val topic = opts.options.valueOf(opts.topicOpt)
++<<<<<<< HEAD
+ if(opts.options.has(opts.configOpt)) {
+ val configs = parseTopicConfigs(opts)
++=======
+ if(opts.options.has(opts.configOpt) ||
opts.options.has(opts.deleteConfigOpt)) {
+ val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
+ val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
+ // compile the final set of configs
+ val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
+ configs.putAll(configsToBeAdded)
+ configsToBeDeleted.foreach(config => configs.remove(config))
++>>>>>>> eedbea6526986783257ad0e025c451a8ee3d9095
AdminUtils.changeTopicConfig(zkClient, topic, configs)
println("Updated config for topic \"%s\".".format(topic))
}
@@@ -188,6 -207,10 +212,13 @@@
.withRequiredArg
.describedAs("name=value")
.ofType(classOf[String])
++<<<<<<< HEAD
++=======
+ val deleteConfigOpt = parser.accepts("deleteConfig", "A topic
configuration override to be removed for an existing topic")
+ .withRequiredArg
+ .describedAs("name")
+ .ofType(classOf[String])
++>>>>>>> eedbea6526986783257ad0e025c451a8ee3d9095
val partitionsOpt = parser.accepts("partitions", "The number of
partitions for the topic being created or " +
"altered (WARNING: If partitions are increased for a topic that has a
key, the partition logic or ordering of the messages will be affected")
.withRequiredArg