[jira] [Commented] (KAFKA-332) Mirroring should use multiple producers; add producer retries to DefaultEventHandler
[ https://issues.apache.org/jira/browse/KAFKA-332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13256026#comment-13256026 ] Jun Rao commented on KAFKA-332: --- Some comments: 1. DefaultEventHandler: 1.1. It would be useful to see the retry # in trace log 1.2 We should capture all Throwable. 2. ProducerConfig: explain a bit more why num.retries is not appropriate for zk-based producer. Basically, during resend, we don't re-select brokers. 3. MirrorMaker: The usage of circularIterator is pretty fancy. Would it be simpler to just put all producers in an array and loop through it circularly? Mirroring should use multiple producers; add producer retries to DefaultEventHandler Key: KAFKA-332 URL: https://issues.apache.org/jira/browse/KAFKA-332 Project: Kafka Issue Type: Improvement Components: core Reporter: Joel Koshy Assignee: Joel Koshy Priority: Minor Attachments: KAFKA-332-v1.patch I'm clubbing these two together as these are both important for mirroring. (1) Multiple producers: Shallow iteration (KAFKA-315) helps improve mirroring throughput when messages are compressed. With shallow iteration, the mirror-maker's consumer does not do deep iteration over compressed messages. However, when its embedded producer sends these messages to the target cluster's brokers, the receiving broker does deep iteration to validate the messages before appending to the log. In the current (pre- KAFKA-48) request handling mechanism, one producer effectively translates to one server-side thread for handling produce requests, so there is still a bottleneck due to decompression (due to message validation) on the target broker. One way to work around this is to use broker.list with multiple brokers specified per broker. E.g., broker.list=0:localhost:9191,1:localhost:9191,2:localhost:9191,... which effectively emulates multiple server-side threads. It would be better to just add a num.producers option to the mirror-maker and instantiate that many producers. (2) Retries: If the mirror-maker uses broker.list and one of the brokers is bounced for any reason, messages can get lost. Message loss can be reduced/avoided if the brokers are behind a VIP and if retries are supported. This option will not work for the zk-based producer because the decision of which broker to send to has already been made, so retries would go to the same (potentially still down) broker. (With KAFKA-253 it would work for zk-based producers as well, but that is only in 0.8). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-331) recurrent produce errors
[ https://issues.apache.org/jira/browse/KAFKA-331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13252476#comment-13252476 ] Jun Rao commented on KAFKA-331: --- The producer is thread safe. Are you using the sync or the async mode in the producer? Do you have a simple test that can reproduce this? recurrent produce errors Key: KAFKA-331 URL: https://issues.apache.org/jira/browse/KAFKA-331 Project: Kafka Issue Type: Bug Reporter: Pierre-Yves Ritschard I am using trunk and regularily see such errors popping up: 32477890 [kafka-processor-7] ERROR kafka.server.KafkaRequestHandlers - Error processing ProduceRequest on pref^@^@^@:0 java.io.FileNotFoundException: /mnt/kafka/logs/pref^@^@^@-0/.kafka (Is a directory) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.init(RandomAccessFile.java:233) at kafka.utils.Utils$.openChannel(Utils.scala:324) at kafka.message.FileMessageSet.init(FileMessageSet.scala:75) at kafka.log.Log.loadSegments(Log.scala:144) at kafka.log.Log.init(Log.scala:116) at kafka.log.LogManager.createLog(LogManager.scala:149) at kafka.log.LogManager.getOrCreateLog(LogManager.scala:204) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:679) 32477890 [kafka-processor-7] ERROR kafka.network.Processor - Closing socket for /xx.xx.xx.xx because of error java.io.FileNotFoundException: /mnt/kafka/logs/pref^@^@^@-0/.kafka (Is a directory) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.init(RandomAccessFile.java:233) at kafka.utils.Utils$.openChannel(Utils.scala:324) at kafka.message.FileMessageSet.init(FileMessageSet.scala:75) at kafka.log.Log.loadSegments(Log.scala:144) at kafka.log.Log.init(Log.scala:116) at kafka.log.LogManager.createLog(LogManager.scala:149) at kafka.log.LogManager.getOrCreateLog(LogManager.scala:204) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:679) This results in a pref directory created inside the log dir. The original topic should be prefix, somehow a NUL gets inserted there. The producing was done with a kafka.javaapi.producer.Producer instance, on which send was called with a kafka.javaapi.producer.ProducerData instance. There are no log entries created inside that dir and no impact on the overall operation of the broker operations and consumers. Is the producer thread-safe ? -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-301) Implement the broker startup procedure
[ https://issues.apache.org/jira/browse/KAFKA-301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13251711#comment-13251711 ] Jun Rao commented on KAFKA-301: --- Can't seem to apply patch v1 to 0.8 branch. patching file core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala patching file core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala patching file core/src/test/scala/unit/kafka/server/StateChangeTest.scala patching file core/src/main/scala/kafka/producer/Producer.scala patching file core/src/main/scala/kafka/producer/async/QueueFullException.scala patching file core/src/main/scala/kafka/admin/AdminUtils.scala patching file core/src/main/scala/kafka/common/NoEpochForPartitionException.scala can't find file to patch at input line 374 Perhaps you used the wrong -p or --strip option? The text leading up to this was: -- |Index: core/src/main/scala/kafka/common/QueueFullException.scala |=== |--- core/src/main/scala/kafka/common/QueueFullException.scala (revision 1304473) |+++ core/src/main/scala/kafka/common/QueueFullException.scala (working copy) -- File to patch: Implement the broker startup procedure -- Key: KAFKA-301 URL: https://issues.apache.org/jira/browse/KAFKA-301 Project: Kafka Issue Type: Sub-task Reporter: Neha Narkhede Assignee: Neha Narkhede Attachments: kafka-301-draft.patch, kafka-301-v1.patch This JIRA will involve implementing the list of actions to be taken on broker startup, as listed by the brokerStartup() and startReplica() algorithm in the Kafka replication design doc. Since the stateChangeListener is part of KAFKA-44, this JIRA can leave it as a stub. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-48) Implement optional long poll support in fetch request
[ https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13250856#comment-13250856 ] Jun Rao commented on KAFKA-48: -- Patch v4 looks good. Just one more comment. 41. RequestPurgatory.update(): if(w == null), could we return a singleton empty array, instead of creating a new one every time? Implement optional long poll support in fetch request --- Key: KAFKA-48 URL: https://issues.apache.org/jira/browse/KAFKA-48 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Jay Kreps Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch, KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff Currently, the fetch request is non-blocking. If there is nothing on the broker for the consumer to retrieve, the broker simply returns an empty set to the consumer. This can be inefficient, if you want to ensure low-latency because you keep polling over and over. We should make a blocking version of the fetch request so that the fetch request is not returned until the broker has at least one message for the fetcher or some timeout passes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-249) Separate out Kafka mirroring into a stand-alone app
[ https://issues.apache.org/jira/browse/KAFKA-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13248898#comment-13248898 ] Jun Rao commented on KAFKA-249: --- Actually, just committed kafka-315. Could you rebase? Separate out Kafka mirroring into a stand-alone app --- Key: KAFKA-249 URL: https://issues.apache.org/jira/browse/KAFKA-249 Project: Kafka Issue Type: Improvement Components: core Reporter: Joel Koshy Assignee: Joel Koshy Fix For: 0.7.1 Attachments: KAFKA-249.v1.patch, KAFKA-249.v2.patch, KAFKA-249.v3-v4.incremental.patch, KAFKA-249.v3.patch, KAFKA-249.v4.patch I would like to discuss on this jira, the feasibility/benefits of separating out Kafka's mirroring feature from the broker into a stand-alone app, as it currently has a couple of limitations and issues. For example, we recently had to deal with Kafka mirrors that were in fact idle due to the fact that mirror threads were not created at start-up due to a rebalancing exception, but the Kafka broker itself did not shutdown. This has since been fixed, but is indicative of (avoidable) problems in embedding non-broker specific features in the broker. Logically, it seems to make sense to separate it out to achieve better division of labor. Furthermore, enhancements to mirroring may be less clunky to implement and use with a stand-alone app. For example to support custom partitioning on the target cluster, or to mirror from multiple clusters we would probably need to be able to pass in multiple embedded consumer/embedded producer configs, which would be less ugly if the mirroring process were a stand-alone app. Also, if we break it out, it would be convenient to use as a consumption engine for the console consumer which will make it easier to add on features such as wildcards in topic consumption, since it contains a ZooKeeper topic discovery component. Any suggestions and/or objections to this? -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
[ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13249129#comment-13249129 ] Jun Rao commented on KAFKA-320: --- Ok, we can take v3 for now and track the broker startup/shutdown in kafka-328. testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException -- Key: KAFKA-320 URL: https://issues.apache.org/jira/browse/KAFKA-320 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7, 0.8 Reporter: Neha Narkhede Assignee: Neha Narkhede Priority: Critical Attachments: kafka-320-v2.patch, kafka-320-v3-delta.patch, kafka-320-v3.patch, kafka-320.patch The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception - [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest) java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering. at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109) at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60) at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52) at kafka.server.KafkaServer.startup(KafkaServer.scala:84) at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at junit.framework.TestCase.runTest(TestCase.java:164) at junit.framework.TestCase.runBare(TestCase.java:130) at junit.framework.TestResult$1.protect(TestResult.java:110) at junit.framework.TestResult.runProtected(TestResult.java:128) at junit.framework.TestResult.run(TestResult.java:113) at junit.framework.TestCase.run(TestCase.java:120) at junit.framework.TestSuite.runTest(TestSuite.java:228) at junit.framework.TestSuite.run(TestSuite.java:223) at junit.framework.TestSuite.runTest(TestSuite.java:228) at junit.framework.TestSuite.run(TestSuite.java:223) at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309) at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40) at sbt.TestRunner.run(TestFramework.scala:53) at sbt.TestRunner.runTest$1(TestFramework.scala:67) at sbt.TestRunner.run(TestFramework.scala:76) at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194) at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205) at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205) at sbt.NamedTestTask.run(TestFramework.scala:92) at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193) at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193) at sbt.TaskManager$Task.invoke(TaskManager.scala:62) at sbt.impl.RunTask.doRun$1(RunTask.scala:77) at sbt.impl.RunTask.runTask(RunTask.scala:85) at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60) at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48) at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48) at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131) at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131) at sbt.Control$.trapUnit(Control.scala:19) at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131) The test basically restarts a server and fails with this exception during the restart This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem - In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During
[jira] [Commented] (KAFKA-48) Implement optional long poll support in fetch request
[ https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13247409#comment-13247409 ] Jun Rao commented on KAFKA-48: -- Thanks for patch v3. Some comments: 31. DelayedFetch is keyed off topic. It should be keyed off (topic, partition) since a consumer may be interested in only a subset of partitions within a topic. 32. KafkaApis: The following 3 lines are duplicated in 2 places. val topicData = readMessageSets(delayed.fetch.offsetInfo) val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1)) Should we put them in a private method and share the code? 33. ExpiredRequestReaper.purgeExpired(): We need to decrement unsatisfied count here. 34. FetchRequest: Can we have the default constants for correlationId, clientid, etc defined and shared btw the constructor and the request builder? 35. MessageSetSend.empty is unused. Should we remove it? Implement optional long poll support in fetch request --- Key: KAFKA-48 URL: https://issues.apache.org/jira/browse/KAFKA-48 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Jay Kreps Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48.patch Currently, the fetch request is non-blocking. If there is nothing on the broker for the consumer to retrieve, the broker simply returns an empty set to the consumer. This can be inefficient, if you want to ensure low-latency because you keep polling over and over. We should make a blocking version of the fetch request so that the fetch request is not returned until the broker has at least one message for the fetcher or some timeout passes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.
[ https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13247458#comment-13247458 ] Jun Rao commented on KAFKA-49: -- Prashanth, Thanks for the patch. It seems that kafka-48 is almost ready. Since that's a relatively large patch, I will commit your patch after kafka-48 is committed. Add acknowledgement to the produce request. --- Key: KAFKA-49 URL: https://issues.apache.org/jira/browse/KAFKA-49 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Assignee: Prashanth Menon Fix For: 0.8 Attachments: KAFKA-49-continued-v2.patch, KAFKA-49-continued.patch, KAFKA-49-v1.patch, KAFKA-49-v2.patch, KAFKA-49-v3.patch Currently, the produce request doesn't get acknowledged. We need to have a broker send a response to the producer and have the producer wait for the response before sending the next request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-249) Separate out Kafka mirroring into a stand-alone app
[ https://issues.apache.org/jira/browse/KAFKA-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13246497#comment-13246497 ] Jun Rao commented on KAFKA-249: --- v2 looks much better now. Some additional comments: 21. Should isTopicAllowed be part of TopicFilterSpec, especially if we want to extend it in the future? If so, we don't need TopicFilter. 22. In this patch, I suggest that we only put message and topic in MessageAndMetadata. We can have a separate jira on how to expose offsets to the consumer. There, we need to discuss how a consume can rewind the consumption using the offset returned. 23. It's probably better to rename KafkaMessageAndMetadataStream to KafkaStream. 24. ZookeeperConsumerConnector: 24.1 reinitializeConsumer: I think it will make the code easier to understand if we explicitly define the type of val consumerThreadIdsPerTopic, topicThreadIds and threadQueueStreamPairs. It would be also very useful to explicitly define the return type of consumerThreadIdsPerTopic.flatten. 24.2 reinitializeConsumer: This method is called every time a new topic is discovered. It feels strange that we have to register the consumer here. Ideally, each consumer is registered exactly once. Also, it seems that each time this method is called, we only add new entries to loadBalancerListener.kafkaMessageAndMetadataStreams. Shouldn't we clear this map first so that deleted topics can be removed? 25. ByteBufferMessageSet: It's not clear to me if the iterator of ByteBufferMessageSet should return MessageAndMetadata. This is because ByteBufferMessageSet itself doesn't know all the metadata, such as topic and partition. So, it seems the iterator of this class should probably remain MessageAndOffset. MessageAndMetadata is only used for the client api. 26. MirrorMaker: The shutdown hook should close producer. Separate out Kafka mirroring into a stand-alone app --- Key: KAFKA-249 URL: https://issues.apache.org/jira/browse/KAFKA-249 Project: Kafka Issue Type: Improvement Components: core Reporter: Joel Koshy Assignee: Joel Koshy Fix For: 0.7.1 Attachments: KAFKA-249.v1.patch, KAFKA-249.v2.patch, KAFKA-249.v3.patch I would like to discuss on this jira, the feasibility/benefits of separating out Kafka's mirroring feature from the broker into a stand-alone app, as it currently has a couple of limitations and issues. For example, we recently had to deal with Kafka mirrors that were in fact idle due to the fact that mirror threads were not created at start-up due to a rebalancing exception, but the Kafka broker itself did not shutdown. This has since been fixed, but is indicative of (avoidable) problems in embedding non-broker specific features in the broker. Logically, it seems to make sense to separate it out to achieve better division of labor. Furthermore, enhancements to mirroring may be less clunky to implement and use with a stand-alone app. For example to support custom partitioning on the target cluster, or to mirror from multiple clusters we would probably need to be able to pass in multiple embedded consumer/embedded producer configs, which would be less ugly if the mirroring process were a stand-alone app. Also, if we break it out, it would be convenient to use as a consumption engine for the console consumer which will make it easier to add on features such as wildcards in topic consumption, since it contains a ZooKeeper topic discovery component. Any suggestions and/or objections to this? -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
[ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13244395#comment-13244395 ] Jun Rao commented on KAFKA-320: --- The ZookeeperTestHarness change looks nice. a couple more comments: 4. KafkaServer: It does look a bit more complex now and some of the testing is not done atomically. How about the following? 4.1 add an AtomticBoolean isServerStartable and initialize to true; 4.2 in startup(), if we can atomically set isServerStartable from true to false, proceed with startup; otherwise throw an exception. 4.3 in shutdown(), if isServerStartable is false, proceed with shutdown, at the very end, set isServerStartable to true. Startup() and shutdown() are expected to be called from the same thread. So we can expect a shutdown won't be called until a startup completes. 5. SyncProducerTest: unused imports testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException -- Key: KAFKA-320 URL: https://issues.apache.org/jira/browse/KAFKA-320 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7, 0.8 Reporter: Neha Narkhede Assignee: Neha Narkhede Priority: Critical Attachments: kafka-320-v2.patch, kafka-320.patch The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception - [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest) java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering. at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109) at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60) at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52) at kafka.server.KafkaServer.startup(KafkaServer.scala:84) at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at junit.framework.TestCase.runTest(TestCase.java:164) at junit.framework.TestCase.runBare(TestCase.java:130) at junit.framework.TestResult$1.protect(TestResult.java:110) at junit.framework.TestResult.runProtected(TestResult.java:128) at junit.framework.TestResult.run(TestResult.java:113) at junit.framework.TestCase.run(TestCase.java:120) at junit.framework.TestSuite.runTest(TestSuite.java:228) at junit.framework.TestSuite.run(TestSuite.java:223) at junit.framework.TestSuite.runTest(TestSuite.java:228) at junit.framework.TestSuite.run(TestSuite.java:223) at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309) at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40) at sbt.TestRunner.run(TestFramework.scala:53) at sbt.TestRunner.runTest$1(TestFramework.scala:67) at sbt.TestRunner.run(TestFramework.scala:76) at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194) at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205) at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205) at sbt.NamedTestTask.run(TestFramework.scala:92) at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193) at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193) at sbt.TaskManager$Task.invoke(TaskManager.scala:62) at sbt.impl.RunTask.doRun$1(RunTask.scala:77) at sbt.impl.RunTask.runTask(RunTask.scala:85) at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60) at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48) at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48) at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131) at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131) at sbt.Control$.trapUnit(Control.scala:19) at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
[jira] [Commented] (KAFKA-249) Separate out Kafka mirroring into a stand-alone app
[ https://issues.apache.org/jira/browse/KAFKA-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13240166#comment-13240166 ] Jun Rao commented on KAFKA-249: --- Thanks for the patch. Some comments: 1. For future extension, I am thinking that we should probably unifying KafkaMessageStream and KafkaMessageAndTopicStream to sth like KafkaMessageMetadataStream. The stream gives a iterator of Message and its associated meta data. For now, the meta data can be just topic. In the future, it may include things like partition id and offset. 2. ZookeeperConsumerConnector: 2.1 updateFetcher: no need to pass in messagStreams 2.2 ZKRebalancerListener: It seems that kafkaMessageStream can be immutable. 2.3 createMessageStreamByFilter: topicsStreamsMap is empty when passed to ZKRebalanceListener. This means that the queue is not cleared during rebalance. 2.4 consumeWildCardTopics: I find it hard to read the code in this method. Is there a real benefit to use implicit conversion here, instead of explicit conversion? It's not clear to me where the conversion is used. The 2-level tuple makes it hard to figure out what the referred fields represent. Is the code relying on groupedTopicThreadIds being sorted by (topic, threadid)? If so, where is that enforced. 3. KafkaServerStartable: Should we remove the embedded consumer now? 4. Utils, UtilsTest: unused import Separate out Kafka mirroring into a stand-alone app --- Key: KAFKA-249 URL: https://issues.apache.org/jira/browse/KAFKA-249 Project: Kafka Issue Type: Improvement Components: core Reporter: Joel Koshy Assignee: Joel Koshy Fix For: 0.7.1 Attachments: KAFKA-249.v1.patch, KAFKA-249.v2.patch I would like to discuss on this jira, the feasibility/benefits of separating out Kafka's mirroring feature from the broker into a stand-alone app, as it currently has a couple of limitations and issues. For example, we recently had to deal with Kafka mirrors that were in fact idle due to the fact that mirror threads were not created at start-up due to a rebalancing exception, but the Kafka broker itself did not shutdown. This has since been fixed, but is indicative of (avoidable) problems in embedding non-broker specific features in the broker. Logically, it seems to make sense to separate it out to achieve better division of labor. Furthermore, enhancements to mirroring may be less clunky to implement and use with a stand-alone app. For example to support custom partitioning on the target cluster, or to mirror from multiple clusters we would probably need to be able to pass in multiple embedded consumer/embedded producer configs, which would be less ugly if the mirroring process were a stand-alone app. Also, if we break it out, it would be convenient to use as a consumption engine for the console consumer which will make it easier to add on features such as wildcards in topic consumption, since it contains a ZooKeeper topic discovery component. Any suggestions and/or objections to this? -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-307) Refactor server code to remove interdependencies between LogManager and KafkaZooKeeper
[ https://issues.apache.org/jira/browse/KAFKA-307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13236734#comment-13236734 ] Jun Rao commented on KAFKA-307: --- +1 on patch v3. Refactor server code to remove interdependencies between LogManager and KafkaZooKeeper -- Key: KAFKA-307 URL: https://issues.apache.org/jira/browse/KAFKA-307 Project: Kafka Issue Type: Bug Affects Versions: 0.7, 0.8 Reporter: Neha Narkhede Attachments: kafka-307-draft.patch, kafka-307-v2.patch, kafka-307-v3.patch Currently, LogManager wraps KafkaZooKeeper which is meant for all zookeeper interaction of a Kafka server. With replication, KafkaZookeeper will handle leader election, various state change listeners and then start replicas. Due to interdependency between LogManager and KafkaZookeeper, starting replicas is not possible until LogManager starts up completely. Due to this, we have to separate the broker startup procedures required for replication to get around this problem. It will be good to refactor and clean up the server code, before diving deeper into replication. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout
[ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13236782#comment-13236782 ] Jun Rao commented on KAFKA-305: --- Prashanth, v2 patch looks good. As for 5, I do see transient failures of testZKSendWithDeadBroker. This is a bit weird. During broker shutdown, we close the ZK client, which should cause all ephemeral nodes to be deleted in ZK. Could you verify if this is indeed the behavior of ZK? As for BrokerPartitionInfo and ProducerPool, we should clean up dead brokers. Could you open a separate jira to track that? SyncProducer does not correctly timeout --- Key: KAFKA-305 URL: https://issues.apache.org/jira/browse/KAFKA-305 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7, 0.8 Reporter: Prashanth Menon Priority: Critical Attachments: KAFKA-305-v1.patch, KAFKA-305-v2.patch So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad. This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout
[ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13237021#comment-13237021 ] Jun Rao commented on KAFKA-305: --- If this is indeed a ZK issue, we can probably check/wait that the ephemeral node is gone before restarting the broker. SyncProducer does not correctly timeout --- Key: KAFKA-305 URL: https://issues.apache.org/jira/browse/KAFKA-305 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7, 0.8 Reporter: Prashanth Menon Priority: Critical Attachments: KAFKA-305-v1.patch, KAFKA-305-v2.patch So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad. This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout
[ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13235614#comment-13235614 ] Jun Rao commented on KAFKA-305: --- Prashant, 2. If you want to make sure that a broker is shut down, you need to call kafkaServer.awaitShutdown after calling kafkaServer.shutdown. Overall, I don't quite understand how the new test works. It only brought down 1 broker and yet the comment says all brokers are down. If it is indeed that all brokers are down, any RPC call to the broker should get a broken pipe or socket closed exception immediately, not a sockettimeout exception. So, to really test that the timeout works, we need to keep the broker alive and somehow delay the response from the server. This can probably be done with a mock request handler. SyncProducer does not correctly timeout --- Key: KAFKA-305 URL: https://issues.apache.org/jira/browse/KAFKA-305 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7, 0.8 Reporter: Prashanth Menon Priority: Critical Attachments: KAFKA-305-v1.patch So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad. This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-310) Incomplete message set validation checks in kafka.log.Log's append API can corrupt on disk log
[ https://issues.apache.org/jira/browse/KAFKA-310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13235636#comment-13235636 ] Jun Rao commented on KAFKA-310: --- ByteBufferMessageSet.validBytes currently makes a deep iteration of all messages, which means that we need to decompress messages. To avoid this overhead, we should change ByteBufferMessageSet.validBytes to use a shallow iterator. Incomplete message set validation checks in kafka.log.Log's append API can corrupt on disk log -- Key: KAFKA-310 URL: https://issues.apache.org/jira/browse/KAFKA-310 Project: Kafka Issue Type: Bug Affects Versions: 0.7 Reporter: Neha Narkhede Priority: Critical Attachments: kafka-310.patch The behavior of the ByteBufferMessageSet's iterator is to ignore and return false if some trailing bytes are found that cannot be de serialized into a Kafka message. The append API in Log, iterates through a ByteBufferMessageSet and validates the checksum of each message. Though, while appending data to the log, it just uses the underlying ByteBuffer that forms the ByteBufferMessageSet. Now, due to some bug, if the ByteBuffer has some trailing data, that will get appended to the on-disk log too. This can cause corruption of the log. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-310) Incomplete message set validation checks in kafka.log.Log's append API can corrupt on disk log
[ https://issues.apache.org/jira/browse/KAFKA-310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13235675#comment-13235675 ] Jun Rao commented on KAFKA-310: --- +1 on v2. Incomplete message set validation checks in kafka.log.Log's append API can corrupt on disk log -- Key: KAFKA-310 URL: https://issues.apache.org/jira/browse/KAFKA-310 Project: Kafka Issue Type: Bug Affects Versions: 0.7 Reporter: Neha Narkhede Priority: Critical Attachments: kafka-310-v2.patch, kafka-310.patch The behavior of the ByteBufferMessageSet's iterator is to ignore and return false if some trailing bytes are found that cannot be de serialized into a Kafka message. The append API in Log, iterates through a ByteBufferMessageSet and validates the checksum of each message. Though, while appending data to the log, it just uses the underlying ByteBuffer that forms the ByteBufferMessageSet. Now, due to some bug, if the ByteBuffer has some trailing data, that will get appended to the on-disk log too. This can cause corruption of the log. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-309) Bug in FileMessageSet's append API can corrupt on disk log
[ https://issues.apache.org/jira/browse/KAFKA-309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13235679#comment-13235679 ] Jun Rao commented on KAFKA-309: --- +1 on the patch. Bug in FileMessageSet's append API can corrupt on disk log -- Key: KAFKA-309 URL: https://issues.apache.org/jira/browse/KAFKA-309 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7 Reporter: Neha Narkhede Assignee: Neha Narkhede Priority: Critical Attachments: kafka-309-test.patch, kafka-309.patch In FileMessageSet's append API, we write a ByteBufferMessageSet to a log in the following manner - while(written messages.sizeInBytes) written += messages.writeTo(channel, 0, messages.sizeInBytes) In ByteBufferMessageSet, the writeTo API uses buffer.duplicate() to append to a channel - def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long = channel.write(buffer.duplicate) If the channel doesn't write the ByteBuffer in one call, then we call it again until sizeInBytes bytes are written. But the next call will use buffer.duplicate() to write to the FileChannel, which will write the entire ByteBufferMessageSet again to the file. Effectively, we have a corrupted set of messages on disk. Thinking about it, FileChannel is a blocking channel, so ideally, the entire ByteBuffer should be written to the FileChannel in one call. I wrote a test (attached here) and saw that it does. But I'm not aware if there are some corner cases when it doesn't do so. In those cases, Kafka will end up corrupting on disk log segment. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-307) Refactor server code to remove interdependencies between LogManager and KafkaZooKeeper
[ https://issues.apache.org/jira/browse/KAFKA-307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13236175#comment-13236175 ] Jun Rao commented on KAFKA-307: --- Thanks for patch v2. Looks cleaner. A couple of other comments: 3. Is it better for KafkaZooKeeper to call ReplicaManager.addLocalReplica directly, instead of going through KafkaServer? For all replicas added in KafkaZookeeper, we know they are all local and should have a log. We could call LogManager.getOrCreateLog to get the log and pass it to ReplicaManager. 4. There are unused imports in KafkaServer. Refactor server code to remove interdependencies between LogManager and KafkaZooKeeper -- Key: KAFKA-307 URL: https://issues.apache.org/jira/browse/KAFKA-307 Project: Kafka Issue Type: Bug Affects Versions: 0.7, 0.8 Reporter: Neha Narkhede Attachments: kafka-307-draft.patch, kafka-307-v2.patch Currently, LogManager wraps KafkaZooKeeper which is meant for all zookeeper interaction of a Kafka server. With replication, KafkaZookeeper will handle leader election, various state change listeners and then start replicas. Due to interdependency between LogManager and KafkaZookeeper, starting replicas is not possible until LogManager starts up completely. Due to this, we have to separate the broker startup procedures required for replication to get around this problem. It will be good to refactor and clean up the server code, before diving deeper into replication. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-300) Implement leader election
[ https://issues.apache.org/jira/browse/KAFKA-300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13233885#comment-13233885 ] Jun Rao commented on KAFKA-300: --- Thanks for the patch. Some comments: 1. KafkaZookeeper: 1.1 LeaderChangeListener.handleDataDeleted: no need to first check that leader exists. Since leader could change immediately after the check is done and subsequent logic still needs to handle this case. 1.2 TopicChangeListener: curChilds returned from the call back gives all children, not just new children. 1.3 SessionExpireListener: We could missed some new topics created when a session expires. Just calling subscribeToTopicAndPartitionsChanges may not be enough since we may need to call handleNewTopics for those missed new topics. 1.4 leaderElection probably needs to return a boolean to indicate if the election succeeded or not. 2. LogManager: 2.1 It seems to me that replicas is not directly tied to LogManager and probably should be kept somewhere else, like KafkaServer. 2.2 It aslo seems that we need to keep a map of (topic, partition_id, broker_id) = Replica so that we can address each replica more efficiently. This can be either a 1 level or a 2 level map. 3. DefaultEventHandler.handle: We probably should break out of the loop when outstandingProduceRequests is empty. 4. ZkUtils.waitUntilLeaderIsElected should be moved to test (sth like ZKTestUtil). 5. KafkaServer,BrokerPartitionInfo,DefaulEventHandler,LazyProducerTest,LeaderElectionTest: removed unused imports Implement leader election - Key: KAFKA-300 URL: https://issues.apache.org/jira/browse/KAFKA-300 Project: Kafka Issue Type: Sub-task Reporter: Neha Narkhede Attachments: kafka-300.patch According to the Kafka replication design (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Replication), this JIRA will involve implementing the leaderElection() procedure. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout
[ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13233969#comment-13233969 ] Jun Rao commented on KAFKA-305: --- Prashanth, Thanks for the patch. This is very useful. Some comments: 1. I think it makes sense for SimpleConsumer to use BlockingChannel as well. Could you change that in this patch too? 2. ProducerTest.testZKSendWithDeadBroker: This test doesn't really test the timeout on getting a response. We probably need to create a mock kafkaserver (that don't send a response) to test this out. 3. BlockingChannel: 3.1 We probably should rename timeoutMs to readTimeoutMs since only reads are subject to the timeout. 3.2 We should pass in a socketSendBufferSize and a socketReceiveBufferSize. 3.3 Should host and port be part of the constructor? It seems to me it's cleaner if each instance of BlockingChannel is tied to 1 host and 1 port. I'd also be interested in your findings on the comparison with NIO with selectors. SyncProducer does not correctly timeout --- Key: KAFKA-305 URL: https://issues.apache.org/jira/browse/KAFKA-305 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7, 0.8 Reporter: Prashanth Menon Priority: Critical Attachments: KAFKA-305-v1.patch So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad. This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-45) Broker startup, leader election, becoming a leader/follower for intra-cluster replication
[ https://issues.apache.org/jira/browse/KAFKA-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13228774#comment-13228774 ] Jun Rao commented on KAFKA-45: -- If there are 2 followers and leader receives ack from only follow 1, but not follower 2 (within timeout), the leader will kick follower 2 from ISR before it can commit the message and ack the producer. So, follower 2 will never get a chance to become the new leader should the current leader fail. Broker startup, leader election, becoming a leader/follower for intra-cluster replication - Key: KAFKA-45 URL: https://issues.apache.org/jira/browse/KAFKA-45 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Neha Narkhede We need to implement the logic for starting a broker with replicated partitions, the leader election logic and how to become a leader and a follower. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.
[ https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13228845#comment-13228845 ] Jun Rao commented on KAFKA-49: -- Prashanth, I am ready to commit your patch. A couple things: 1. Your patch added a new class ResponseHandler, but it's not used. Should that be removed? 2. Your concern #1 is valid. Could you create another jira to track this? 3. For your concern #2, it's ok for getMetadataApi to return empty leader in a transition state. The client will simply backoff a bit and call getMetadataApi again. Add acknowledgement to the produce request. --- Key: KAFKA-49 URL: https://issues.apache.org/jira/browse/KAFKA-49 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Assignee: Prashanth Menon Fix For: 0.8 Attachments: KAFKA-49-v1.patch, KAFKA-49-v2.patch Currently, the produce request doesn't get acknowledged. We need to have a broker send a response to the producer and have the producer wait for the response before sending the next request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-296) Update Go Client to new version of Go
[ https://issues.apache.org/jira/browse/KAFKA-296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13228865#comment-13228865 ] Jun Rao commented on KAFKA-296: --- I can't seem to apply the patch. Could you rebase? patching file clients/go/tools/publisher/publisher.go Hunk #1 FAILED at 22. Hunk #2 succeeded at 87 with fuzz 2 (offset 38 lines). 1 out of 2 hunks FAILED -- saving rejects to file clients/go/tools/publisher/publisher.go.rej patching file clients/go/tools/publisher/publisher.go Hunk #1 FAILED at 23. Hunk #2 FAILED at 41. 2 out of 2 hunks FAILED -- saving rejects to file clients/go/tools/publisher/publisher.go.rej Update Go Client to new version of Go - Key: KAFKA-296 URL: https://issues.apache.org/jira/browse/KAFKA-296 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.7.1 Reporter: AaronR Attachments: go1updates.patch, kafka296-v2.patch go (http://golang.org) is close to releasing a new release of go (1.0) which requires updates to the client, in the meantime most of the go community has moved to this version. This change contains: * language changes to existing client (os.Error, time. Signals, etc) * removes the Makefile's (no longer used by go) * It also runs go fmt (formats it in standard go) which are most of the lines changes, from spaces to tabs. * updates the import path to allow for go get installs (don't need to get source and build) Not sure which versions this should apply to, but i think it should go to 0.7 and newer. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.
[ https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13225321#comment-13225321 ] Jun Rao commented on KAFKA-49: -- Prashanth, Thanks for the patch. Overall, it looks pretty good. Some comments: 1. KafkaApis: Even when the produce request requires no ack, we will still need to send error code back. So we should always send a ProduceResponse. When no ack is specified, we probably don't need to send offsets back. 2. ProduceRequest.getNumMessages: rename it to something like getNumTopicPartitions 3. AsyncProducerTeest.testDefaultHanlderRetryLogic: doesn't really test retry 4. SyncProducerTest.testProduceBlocksWhenRequired: Use createTopic instead of creating ZK path directly. 5. I agree that it's probably better to use Seq in our requests/response, instead of Array. Then we need a java version to convert Seq to java array and vice versa. Please open a separate jira to track this. Add acknowledgement to the produce request. --- Key: KAFKA-49 URL: https://issues.apache.org/jira/browse/KAFKA-49 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Assignee: Prashanth Menon Fix For: 0.8 Attachments: KAFKA-49-v1.patch Currently, the produce request doesn't get acknowledged. We need to have a broker send a response to the producer and have the producer wait for the response before sending the next request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.
[ https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13225845#comment-13225845 ] Jun Rao commented on KAFKA-49: -- 1. Or we can just treat noacks the same as act=1 for now. Add acknowledgement to the produce request. --- Key: KAFKA-49 URL: https://issues.apache.org/jira/browse/KAFKA-49 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Assignee: Prashanth Menon Fix For: 0.8 Attachments: KAFKA-49-v1.patch Currently, the produce request doesn't get acknowledged. We need to have a broker send a response to the producer and have the producer wait for the response before sending the next request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-273) Occassional GZIP errors on the server while writing compressed data to disk
[ https://issues.apache.org/jira/browse/KAFKA-273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13224538#comment-13224538 ] Jun Rao commented on KAFKA-273: --- The patch for EOF looks fine. We probably need to do some system test to make sure this doesn't introduce new problems, especially when the compressed size is relatively large. Once that test is done. We can commit the patch. Occassional GZIP errors on the server while writing compressed data to disk --- Key: KAFKA-273 URL: https://issues.apache.org/jira/browse/KAFKA-273 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7 Reporter: Neha Narkhede Assignee: Neha Narkhede Attachments: kafka-273.patch Occasionally, we see the following errors on the Kafka server - 2012/02/08 14:58:21.832 ERROR [KafkaRequestHandlers] [kafka-processor-6] [kafka] Error processing MultiProducerRequest on NusImpressionSetEvent:0 java.io.EOFException: Unexpected end of ZLIB input stream at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:223) at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:141) at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:92) at java.io.FilterInputStream.read(FilterInputStream.java:90) at kafka.message.GZIPCompression.read(CompressionUtils.scala:52) at kafka.message.CompressionUtils$$anonfun$decompress$1.apply$mcI$sp(CompressionUtils.scala:143) at kafka.message.CompressionUtils$$anonfun$decompress$1.apply(CompressionUtils.scala:143) at kafka.message.CompressionUtils$$anonfun$decompress$1.apply(CompressionUtils.scala:143) at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598) at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549) at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394) at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549) at scala.collection.immutable.Stream.foreach(Stream.scala:255) at kafka.message.CompressionUtils$.decompress(CompressionUtils.scala:143) at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:119) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:132) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:81) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at kafka.message.MessageSet.foreach(MessageSet.scala:87) at kafka.log.Log.append(Log.scala:204) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:70) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:63) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:63) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:63) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:42) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:42) at kafka.network.Processor.handle(SocketServer.scala:297) at kafka.network.Processor.read(SocketServer.scala:320) at kafka.network.Processor.run(SocketServer.scala:215) at
[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.
[ https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13223407#comment-13223407 ] Jun Rao commented on KAFKA-49: -- Prashanth, Yes, that's actually a real bug. Instead of returning in retry, we should just set a boolean to indicate that a send has succeeded. At the end, we will throw FailedToSendMessageException if the boolean is not set. Otherwise, we will continue with the for loop. Could you file a separate jira to fix that? Thanks for catching the bug. Add acknowledgement to the produce request. --- Key: KAFKA-49 URL: https://issues.apache.org/jira/browse/KAFKA-49 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Prashanth Menon Currently, the produce request doesn't get acknowledged. We need to have a broker send a response to the producer and have the producer wait for the response before sending the next request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.
[ https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13222700#comment-13222700 ] Jun Rao commented on KAFKA-49: -- Prashanth, That's a good question. Initially, I was just thinking that for async producers, if we get an error during send (after retries), we will just log the error without telling the client. Currently, the event handler is not really extensible. I can image that we add some kind of callback to return those errors. The question is what will the client do on those errors. Will it resend? If so, we will need to pass the failed requests through callback too. I am curious about how other messaging systems like activeMQ do in the async mode. Add acknowledgement to the produce request. --- Key: KAFKA-49 URL: https://issues.apache.org/jira/browse/KAFKA-49 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Prashanth Menon Currently, the produce request doesn't get acknowledged. We need to have a broker send a response to the producer and have the producer wait for the response before sending the next request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.
[ https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13222779#comment-13222779 ] Jun Rao commented on KAFKA-49: -- The current defaulteventhandler already refreshes topic metadata on retries. So, if we return any failed request to the client, there is probably not much the client can do, except for logging it. In any case, we should use a separate jira to track if we need any aysnc callback on the producer side. Add acknowledgement to the produce request. --- Key: KAFKA-49 URL: https://issues.apache.org/jira/browse/KAFKA-49 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Prashanth Menon Currently, the produce request doesn't get acknowledged. We need to have a broker send a response to the producer and have the producer wait for the response before sending the next request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-240) implement new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13221183#comment-13221183 ] Jun Rao commented on KAFKA-240: --- Joe, kafka-239 has been committed to the 0.8 branch. You are good to go. After rebasing your code, make sure all unit tests still pass. implement new producer and consumer request format -- Key: KAFKA-240 URL: https://issues.apache.org/jira/browse/KAFKA-240 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8 Reporter: Jun Rao Labels: fetch, replication, wireprotocol Fix For: 0.8 Attachments: KAFKA-240-FetchRequest-v1.patch, KAFKA-240-FetchRequest-v2.patch, KAFKA-240-FetchRequest-validation-v1.patch, KAFKA-240.ProducerRequest.v2.patch, KAFKA-240.ProducerRequest.v3.patch, KAFKA-240.ProducerRequest.v4.patch, KAFKA-240.v3.patch, kafka-240_unittestfix_delta.patch We want to change the producer/consumer request/response format according to the discussion in the following wiki: https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-240) implement new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13221318#comment-13221318 ] Jun Rao commented on KAFKA-240: --- Yes, however, that information is not passed into ProduceRequest. So the server doesn't know the version of the producer client. implement new producer and consumer request format -- Key: KAFKA-240 URL: https://issues.apache.org/jira/browse/KAFKA-240 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8 Reporter: Jun Rao Labels: fetch, replication, wireprotocol Fix For: 0.8 Attachments: KAFKA-240-FetchRequest-v1.patch, KAFKA-240-FetchRequest-v2.patch, KAFKA-240-FetchRequest-validation-v1.patch, KAFKA-240.ProducerRequest.v2.patch, KAFKA-240.ProducerRequest.v3.patch, KAFKA-240.ProducerRequest.v4.patch, KAFKA-240.v3.patch, kafka-240_unittestfix_delta.patch We want to change the producer/consumer request/response format according to the discussion in the following wiki: https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-239) Wire existing producer and consumer to use the new ZK data structure
[ https://issues.apache.org/jira/browse/KAFKA-239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13220155#comment-13220155 ] Jun Rao commented on KAFKA-239: --- Please address items 2 and 11. For 4, I am fine leaving the ZK check in LogManager for now. Please add a comment that this will be fixed later. Unit tests seem to hang consistently for me. Seems to hang on testLatestOffsetResetForward(kafka.integration.AutoOffsetResetTest). Wire existing producer and consumer to use the new ZK data structure Key: KAFKA-239 URL: https://issues.apache.org/jira/browse/KAFKA-239 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao Assignee: Neha Narkhede Attachments: kafka-239-latest-revision-unit-tests-passing.patch, kafka-239-v2.patch, kafka-239-v3.patch We can assume the leader of a partition is always the first replica. Data will only be stored in the first replica. So, there is no fault-tolerance support yet. Just make the partition logical. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-50) kafka intra-cluster replication support
[ https://issues.apache.org/jira/browse/KAFKA-50?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13220290#comment-13220290 ] Jun Rao commented on KAFKA-50: -- Yes, we can start a new wiki for each of the sub-jiras, if needed. kafka intra-cluster replication support --- Key: KAFKA-50 URL: https://issues.apache.org/jira/browse/KAFKA-50 Project: Kafka Issue Type: New Feature Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: kafka_replication_detailed_design_v2.pdf, kafka_replication_highlevel_design.pdf, kafka_replication_lowlevel_design.pdf Currently, Kafka doesn't have replication. Each log segment is stored in a single broker. This limits both the availability and the durability of Kafka. If a broker goes down, all log segments stored on that broker become unavailable to consumers. If a broker dies permanently (e.g., disk failure), all unconsumed data on that node is lost forever. Our goal is to replicate every log segment to multiple broker nodes to improve both the availability and the durability. We'd like to support the following in Kafka replication: 1. Configurable synchronous and asynchronous replication 2. Small unavailable window (e.g., less than 5 seconds) during broker failures 3. Auto recovery when a failed broker rejoins 4. Balanced load when a broker fails (i.e., the load on the failed broker is evenly spread among multiple surviving brokers) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-239) Wire existing producer and consumer to use the new ZK data structure
[ https://issues.apache.org/jira/browse/KAFKA-239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13219370#comment-13219370 ] Jun Rao commented on KAFKA-239: --- 1. ZkUtils.getTopicPartitionsPath: There are a couple reasons why we chose to use /brokers/topics/partitions to store partition data, instead of /brokers/topics: (1) This distinguishes the ZK layout in 0.8 from 0.7. In 0.7, we already store broker ids directly under /brokers/topics. So there could be confusion if we put partition ids at the same level. (2) This also leaves room for future extension to add non-partition level per topic info in ZK. 2. ZkUtils: rename idoesBrokerHostPartition to something like isPartitionOnBroker 3. LogManager: logFlusherScheduler.startUp is called twice. 4. LogManager.getOrCreateLog: The LogManager shouldn't need to know anything about ZK. The check of whether a partition exists on a broker should be done at KafkaApi level. Ideally, we just want to check partition ownership from a local cache, which will be populated by ZK listeners (part of kafka-44). In this patch, we can either not check at all or directly check from ZK (with the intention to have it optimized in kafka-44). 5. KafkaServer: the log cleaner scheduler is scheduled in LogManager, should we start up the schedule there too. If there is good reason to do that, should we protect startUp from being called more than once? 6. KafkaScheduler: what's the benefit of having a startUp method? It seems creating the executor in the constructor is simpler. 7. Partition should be made logical. It only contains topic and partition id. There will be a new entity Replica which is associated with broker id. 8. ProducerPool: remove unused import and fix indentation in close() 9. AsyncProducerTest: There are duplicated code that sets up mock to form expected partition metadata. Can we have a separate method to share the code? 10. Producer: Is zkClient in the constructor just for testing? If so, add a comment to indicate that. 11. TestUtils: why do we need checkSetEqual? Doesn't scala do that by default? 12. ZookeeperConsumerConnectorTest.testLederSelectionForPartition: is it really testing leader election? It seems to be testing partition ownership. 13. ZkUtils.getLeaderForPartition: This is probably fine for this patch. However, we should think whether it is better to get leader info from ZK directly or use the getMetaData api. Wire existing producer and consumer to use the new ZK data structure Key: KAFKA-239 URL: https://issues.apache.org/jira/browse/KAFKA-239 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao Assignee: Neha Narkhede Attachments: kafka-239-latest-revision-unit-tests-passing.patch, kafka-239-v2.patch We can assume the leader of a partition is always the first replica. Data will only be stored in the first replica. So, there is no fault-tolerance support yet. Just make the partition logical. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-279) kafka-console-producer does not take in customized values of --batch-size or --timeout
[ https://issues.apache.org/jira/browse/KAFKA-279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13217331#comment-13217331 ] Jun Rao commented on KAFKA-279: --- That typically means that you are sending data at a rate faster than the broker can persist. Try increasing flush.interval to sth like 1 on the broker to increase server throughput. kafka-console-producer does not take in customized values of --batch-size or --timeout -- Key: KAFKA-279 URL: https://issues.apache.org/jira/browse/KAFKA-279 Project: Kafka Issue Type: Bug Components: contrib Affects Versions: 0.7 Environment: Ubuntu 10.04, openjdk1.6 with default installation of 0.7 Reporter: milind parikh Priority: Minor Attachments: kafka-279.patch 1. While the default console-producer, console-consumer paradigm works great, when I try modiying the batch size bin/kafka-console-producer.sh --batch-size 300 --zookeeper localhost:2181 --topic test1 it gives me a Exception in thread main java.lang.NumberFormatException: null at java.lang.Integer.parseInt(Integer.java:443) at java.lang.Integer.parseInt(Integer.java:514) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at kafka.utils.Utils$.getIntInRange(Utils.scala:189) at kafka.utils.Utils$.getInt(Utils.scala:174) at kafka.producer.async.AsyncProducerConfigShared$class.$init$(AsyncProducerConfig.scala:45) at kafka.producer.ProducerConfig.init(ProducerConfig.scala:25) at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:108) at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala) I have looked at the code and can't figure out what's wrong 2. When I do bin/kafka-console-producer.sh --timeout 3 --zookeeper localhost:2181 --topic test1 I would think that console-producer would wait for 30s if the batch size (default 200) is not full. It doesn't. It takes the same time without the timeout parameter (default 1000) and dumps whatever the batch size. Resolution from Jun 1. The code does the following to set batch size props.put(batch.size, batchSize) Instead, it should do props.put(batch.size, batchSize.toString) 2. It sets the wrong property name for timeout. Instead of doing props.put(queue.enqueueTimeout.ms, sendTimeout.toString) it should do props.put(queue.time, sendTimeout.toString) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-286) consumer sometimes don't release partition ownership properly in ZK during rebalance
[ https://issues.apache.org/jira/browse/KAFKA-286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13217375#comment-13217375 ] Jun Rao commented on KAFKA-286: --- Will this happen? It doesn't seem possible to me. In step 2, when we release 0-0 and 0-1 during rebalance, we clear topicRegistry. Since this rebalance fails, topicRegistry will not be populated. So, in step 4, there is nothing to release for c1. consumer sometimes don't release partition ownership properly in ZK during rebalance Key: KAFKA-286 URL: https://issues.apache.org/jira/browse/KAFKA-286 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.7.1 Attachments: kafka-286.patch, kafka-286_v2.patch -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-249) Separate out Kafka mirroring into a stand-alone app
[ https://issues.apache.org/jira/browse/KAFKA-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13217513#comment-13217513 ] Jun Rao commented on KAFKA-249: --- Could we support subscribing to wildcard topics directly in consumerConnector? We could add a new api like the following: createMessageStream(topicRegex: String) : KafkaMessageStream All topics that match topicRegex will be returned in a single message stream, which can then be iterated. Separate out Kafka mirroring into a stand-alone app --- Key: KAFKA-249 URL: https://issues.apache.org/jira/browse/KAFKA-249 Project: Kafka Issue Type: Improvement Components: core Reporter: Joel Koshy Assignee: Joel Koshy Fix For: 0.7.1 Attachments: KAFKA-249.v1.patch I would like to discuss on this jira, the feasibility/benefits of separating out Kafka's mirroring feature from the broker into a stand-alone app, as it currently has a couple of limitations and issues. For example, we recently had to deal with Kafka mirrors that were in fact idle due to the fact that mirror threads were not created at start-up due to a rebalancing exception, but the Kafka broker itself did not shutdown. This has since been fixed, but is indicative of (avoidable) problems in embedding non-broker specific features in the broker. Logically, it seems to make sense to separate it out to achieve better division of labor. Furthermore, enhancements to mirroring may be less clunky to implement and use with a stand-alone app. For example to support custom partitioning on the target cluster, or to mirror from multiple clusters we would probably need to be able to pass in multiple embedded consumer/embedded producer configs, which would be less ugly if the mirroring process were a stand-alone app. Also, if we break it out, it would be convenient to use as a consumption engine for the console consumer which will make it easier to add on features such as wildcards in topic consumption, since it contains a ZooKeeper topic discovery component. Any suggestions and/or objections to this? -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-44) Various ZK listeners to support intra-cluster replication
[ https://issues.apache.org/jira/browse/KAFKA-44?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13215728#comment-13215728 ] Jun Rao commented on KAFKA-44: -- It may be possible, but this can be a bit tricky. When you move a partition, you want to wait until the partition in the new broker has fully caught up, before deleting the one on the old broker. So, one way to achieve this is to have another ZK path that indicates this transition state. After the transition is done, the new assignment will be added to the assigned_partition path. In any case, let's start by just focusing on static partition assignment. We can worry about partition reassignment a bit later when we get to kafka-42. Various ZK listeners to support intra-cluster replication - Key: KAFKA-44 URL: https://issues.apache.org/jira/browse/KAFKA-44 Project: Kafka Issue Type: Bug Reporter: Jun Rao We need to implement the new ZK listeners for the new paths registered in ZK. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-45) Broker startup, leader election, becoming a leader/follower for intra-cluster replication
[ https://issues.apache.org/jira/browse/KAFKA-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13213033#comment-13213033 ] Jun Rao commented on KAFKA-45: -- Prashanth, That's a good question. We'd like to make replicas identical with each other. This is easy for size-based log retention, but harder for time-based log retention. What you proposed is to have only the leader delete old log segments and propagate this information to the followers. This seems like a reasonable approach. The question is how should the leader communicate such information to the followers. One possibility is to piggyback on the FetchResponse returned to the followers. This will mean some extra optional fields in FetchResponse. Broker startup, leader election, becoming a leader/follower for intra-cluster replication - Key: KAFKA-45 URL: https://issues.apache.org/jira/browse/KAFKA-45 Project: Kafka Issue Type: Bug Reporter: Jun Rao We need to implement the logic for starting a broker with replicated partitions, the leader election logic and how to become a leader and a follower. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-44) Various ZK listeners to support intra-cluster replication
[ https://issues.apache.org/jira/browse/KAFKA-44?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13213283#comment-13213283 ] Jun Rao commented on KAFKA-44: -- Not sure if I follow exactly what you are proposing here. In particular, what does the broker recreate? My thinking is the following: /brokers/[brokerId]/new_partition/[topic:partitionId] is the source of truth about what topic/partitions a broker has and that path is only created during topic creation. A broker listens to that path to pick up new topic/partition assigned to it. When a broker starts up, it simply reads all /brokers/[brokerId]/new_partition/[topic:partitionId] to determine the set of topic/partition that it should have. So the broker never needs to recreate that path. Various ZK listeners to support intra-cluster replication - Key: KAFKA-44 URL: https://issues.apache.org/jira/browse/KAFKA-44 Project: Kafka Issue Type: Bug Reporter: Jun Rao We need to implement the new ZK listeners for the new paths registered in ZK. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-277) Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function
[ https://issues.apache.org/jira/browse/KAFKA-277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13212133#comment-13212133 ] Jun Rao commented on KAFKA-277: --- Some comments: 1. The variable event just binds to every item in the sequence by the foreach method. There is no need to rename it since each item in processedEvents is supposed to be a single event. 2. In ByteBufferMessageSet, instead of duplicating code in shallowIterator, could we rename deepIterator to internalIterator and add a flag to control whether we want to do shallow iteration or deep iteration? In general, we don't want to expose the shallow iterator externally. So, it's better if we just add a verifyMessageSize method in ByteBufferMessageSet that uses shallow iterator. Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function Key: KAFKA-277 URL: https://issues.apache.org/jira/browse/KAFKA-277 Project: Kafka Issue Type: Bug Reporter: Yang Ye Attachments: shallow_iterator.patch Shallow iterator just traverse the first level messages of a ByteBufferMessageSet, compressed messages won't be decompressed and treated individually -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-268) Add another reconnection condition to the syncProducer: the time elapsed since last connection
[ https://issues.apache.org/jira/browse/KAFKA-268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13212169#comment-13212169 ] Jun Rao commented on KAFKA-268: --- Committed the improvement to trunk. Add another reconnection condition to the syncProducer: the time elapsed since last connection Key: KAFKA-268 URL: https://issues.apache.org/jira/browse/KAFKA-268 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: Yang Ye Priority: Minor Fix For: 0.7.1 Attachments: kafka-reconnect-time.patch, random_last_connection_time.patch, time_based_reconnect_feature.patch Add another reconnection condition to the syncProducer: the time elapsed since last connection. If it's larger than the pre-specified threshold, close the connection and reopen it. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-268) Add another reconnection condition to the syncProducer: the time elapsed since last connection
[ https://issues.apache.org/jira/browse/KAFKA-268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13210382#comment-13210382 ] Jun Rao commented on KAFKA-268: --- A suggestion: It's possible that we have a number of producer clients all started at around the same time. Then they will all reconnect at almost exactly the same time. This is probably not good for load balancing. What we want is to spread the reconnects. One way to do that is to initialize lastConnectionTime to a random timestamp btw (now - reconnectTimeInterval) and now. Add another reconnection condition to the syncProducer: the time elapsed since last connection Key: KAFKA-268 URL: https://issues.apache.org/jira/browse/KAFKA-268 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: Yang Ye Priority: Minor Fix For: 0.7.1 Attachments: kafka-reconnect-time.patch, time_based_reconnect_feature.patch Add another reconnection condition to the syncProducer: the time elapsed since last connection. If it's larger than the pre-specified threshold, close the connection and reopen it. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-268) Add another reconnection condition to the syncProducer: the time elapsed since last connection
[ https://issues.apache.org/jira/browse/KAFKA-268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13210620#comment-13210620 ] Jun Rao commented on KAFKA-268: --- How about the following? private var lastConnectionTime = System.currentTimeMillis - SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval Add another reconnection condition to the syncProducer: the time elapsed since last connection Key: KAFKA-268 URL: https://issues.apache.org/jira/browse/KAFKA-268 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: Yang Ye Priority: Minor Fix For: 0.7.1 Attachments: kafka-reconnect-time.patch, random_last_connection_time.patch, time_based_reconnect_feature.patch Add another reconnection condition to the syncProducer: the time elapsed since last connection. If it's larger than the pre-specified threshold, close the connection and reopen it. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-265) Add a queue of zookeeper notifications in the zookeeper consumer to reduce the number of rebalancing attempts
[ https://issues.apache.org/jira/browse/KAFKA-265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13209992#comment-13209992 ] Jun Rao commented on KAFKA-265: --- Committed the fix for both Condition and shutdown to trunk. Add a queue of zookeeper notifications in the zookeeper consumer to reduce the number of rebalancing attempts - Key: KAFKA-265 URL: https://issues.apache.org/jira/browse/KAFKA-265 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.7 Reporter: Neha Narkhede Assignee: Jun Rao Fix For: 0.7.1 Attachments: kafka-265.patch, kafka-265_await_fix.patch, kafka-265_shutdown.patch, kafka-265_v2.patch Original Estimate: 168h Remaining Estimate: 168h The correct fix for KAFKA-262 and other known issues with the current consumer rebalancing approach, is to get rid of the cache in the zookeeper consumer. The side-effect of that fix, though, is the large number of zookeeper notifications that will trigger a full rebalance operation on the consumer. Ideally, the zookeeper notifications can be batched and only one rebalance operation can be triggered for several such ZK notifications. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-274) Handle corrupted messages cleanly
[ https://issues.apache.org/jira/browse/KAFKA-274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13209993#comment-13209993 ] Jun Rao commented on KAFKA-274: --- +1 for the patch. Handle corrupted messages cleanly - Key: KAFKA-274 URL: https://issues.apache.org/jira/browse/KAFKA-274 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.7.1 Attachments: KAFKA-274.patch, KAFKA-274_v2.patch This is related to KAFKA-273 and is filed to improve the code to have more defensive checks against corrupted data. The data could get corrupted either due to a Kafka bug or due to bit flipping on the network. The message iterator should check if the message is valid before trying to decompress it. On the producer side, defensive checks can be turned on to verify the messages before writing those to the socket. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-240) implement new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13208691#comment-13208691 ] Jun Rao commented on KAFKA-240: --- Thanks Prashanth, excellent patch. Committed Validation v1 patch to 0.8. implement new producer and consumer request format -- Key: KAFKA-240 URL: https://issues.apache.org/jira/browse/KAFKA-240 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8 Reporter: Jun Rao Labels: fetch, replication, wireprotocol Fix For: 0.8 Attachments: KAFKA-240-FetchRequest-v1.patch, KAFKA-240-FetchRequest-v2.patch, KAFKA-240-FetchRequest-validation-v1.patch, KAFKA-240.v3.patch We want to change the producer/consumer request/response format according to the discussion in the following wiki: https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-240) implement new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13206927#comment-13206927 ] Jun Rao commented on KAFKA-240: --- Prashanth, Another thing. We don't enforce that a given topic appears in at most 1 OffsetDetail in a FetchRequest. We will need to do that since FetchResponse code assumes that. implement new producer and consumer request format -- Key: KAFKA-240 URL: https://issues.apache.org/jira/browse/KAFKA-240 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8 Reporter: Jun Rao Labels: fetch, replication, wireprotocol Fix For: 0.8 Attachments: KAFKA-240-FetchRequest-v1.patch, KAFKA-240-FetchRequest-v2.patch, KAFKA-240.v3.patch We want to change the producer/consumer request/response format according to the discussion in the following wiki: https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-268) Add another reconnection condition to the syncProducer: the time elapsed since last connection
[ https://issues.apache.org/jira/browse/KAFKA-268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13206955#comment-13206955 ] Jun Rao commented on KAFKA-268: --- Some comments: 1. SyncProducerConfigShared: change reconnectTimeInterval to reconnectTimeIntervalMs and reconnect.timeInterval to reconnect.time.interval.ms, to be consistent with existing naming convention. 2. There is no real change to ByteBufferMessageSet and Utils. Please revert them. 3. The default value of reconnectTimeInterval is too small. Let's set it to 10 minutes. Also, we probably need a way to turn off time-based reconnect. How about using a negative value to indicate that? Let's also add a comment in the code to document that. Add another reconnection condition to the syncProducer: the time elapsed since last connection Key: KAFKA-268 URL: https://issues.apache.org/jira/browse/KAFKA-268 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: Yang Ye Priority: Minor Attachments: kafka-reconnect-time.patch Add another reconnection condition to the syncProducer: the time elapsed since last connection. If it's larger than the pre-specified threshold, close the connection and reopen it. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-270) sync producer / consumer test producing lot of kafka server exceptions not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html
[ https://issues.apache.org/jira/browse/KAFKA-270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13207023#comment-13207023 ] Jun Rao commented on KAFKA-270: --- A few things to try: 1. It seems that there is ZK session expiration in the client. This should be rare. If it's frequent, it's very likely caused by client GC. Please check your GC log. 2. Enable debug level logging in FileMessageSet in the broker. You will see the flush time for each log write. See if the flush time is reasonable (typically low 10s of ms) since it controls how many IOs a broker can do per second. sync producer / consumer test producing lot of kafka server exceptions not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html -- Key: KAFKA-270 URL: https://issues.apache.org/jira/browse/KAFKA-270 Project: Kafka Issue Type: Bug Components: clients, core Affects Versions: 0.7 Environment: Linux 2.6.18-238.1.1.el5 , x86_64 x86_64 x86_64 GNU/Linux ext3 file system with raid10 Reporter: Praveen Ramachandra Labels: clients, core, newdev, performance I am getting ridiculously low producer and consumer throughput. I am using default config values for producer, consumer and broker which are very good starting points, as they should yield sufficient throughput. Appreciate if you point what settings/changes-in-code needs to be done to get higher throughput. I changed num.partitions in the server.config to 10. Please look below for exception and error messages from the server BTW: I am running server, zookeeper, producer, consumer on the same host. Consumer Code= long startTime = System.currentTimeMillis(); long endTime = startTime + runDuration*1000l; Properties props = new Properties(); props.put(zk.connect, localhost:2181); props.put(groupid, subscriptionName); // to support multiple subscribers props.put(zk.sessiontimeout.ms, 400); props.put(zk.synctime.ms, 200); props.put(autocommit.interval.ms, 1000); consConfig = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consConfig); MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(topicName, new Integer(1)); // has the topic to which to subscribe to MapString, ListKafkaMessageStreamMessage consumerMap = consumer.createMessageStreams(topicCountMap); KafkaMessageStreamMessage stream = consumerMap.get(topicName).get(0); ConsumerIteratorMessage it = stream.iterator(); while(System.currentTimeMillis() = endTime ) { it.next(); // discard data consumeMsgCount.incrementAndGet(); } End consumer CODE =Producer CODE props.put(serializer.class, kafka.serializer.StringEncoder); props.put(zk.connect, localhost:2181); // Use random partitioner. Don't need the key type. Just set it to Integer. // The message is of type String. producer = new kafka.javaapi.producer.ProducerInteger, String(new ProducerConfig(props)); long endTime = startTime + runDuration*1000l; // run duration is in seconds while(System.currentTimeMillis() = endTime ) { String msg = org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0)); producer.send(new ProducerDataInteger, String(topicName, msg)); pc.incrementAndGet(); } java.util.Date date = new java.util.Date(System.currentTimeMillis()); System.out.println(date+ :: stopped producer for topic+topicName); =END Producer CODE I see a bunch of exceptions like this [2012-02-11 02:44:11,945] ERROR Closing socket for /188.125.88.145 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506) at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:107) at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51) at kafka.network.MultiSend.writeTo(Transmission.scala:95) at kafka.network.Processor.write(SocketServer.scala:332) at kafka.network.Processor.run(SocketServer.scala:209) at java.lang.Thread.run(Thread.java:662) java.io.IOException: Connection reset by peer at
[jira] [Commented] (KAFKA-47) Create topic support and new ZK data structures for intra-cluster replication
[ https://issues.apache.org/jira/browse/KAFKA-47?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13203725#comment-13203725 ] Jun Rao commented on KAFKA-47: -- Prashanth, good question. Yes, we could continue using the current log structure {log.dir}/topicname-partitionid. The only thing is that we would like to store some per topic metadata on disk, e.g., the version id (creation time) of a topic (to deal with some of the edge cases during topic re-creation). With the current structure, we either have to duplicate the topic metadata in each partition directory or deterministically pick one partition (like the smallest one) to store the metadata. Neither is ideal. It's much cleaner if we use the new structure {log.dir}/topicname/partitionid. Then the topic metadata can be stored under {log.dir}/topicname. Create topic support and new ZK data structures for intra-cluster replication - Key: KAFKA-47 URL: https://issues.apache.org/jira/browse/KAFKA-47 Project: Kafka Issue Type: Bug Reporter: Jun Rao We need the DDL syntax for creating new topics. May need to use things like javaCC. Also, we need to register new data structures in ZK accordingly. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-262) Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own
[ https://issues.apache.org/jira/browse/KAFKA-262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13203748#comment-13203748 ] Jun Rao commented on KAFKA-262: --- Some comments: 1. In ZookeeperConsumerConnector.reflectPartitionOwnershipDecision, the local variable success is not intuitive. It should be named to something like hasFailure. 2. In ZookeeperConsumerConnector.releasePartitionOwnership. It's not clear to me why this method has to take an input parameter. Wouldn't it be simpler to always release partition ownership according to topicRegistry? Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own Key: KAFKA-262 URL: https://issues.apache.org/jira/browse/KAFKA-262 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7 Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.7.1 Attachments: kafka-262.patch Original Estimate: 24h Remaining Estimate: 24h The consumer maintains a cache of topics and partitions it owns along with the fetcher queues corresponding to those. But while releasing partition ownership, this cache is not cleared. This leads the consumer to release a partition that it does not own any more. This can also lead the consumer to commit offsets for partitions that it no longer consumes from. The rebalance operation goes through following steps - 1. close fetchers 2. commit offsets 3. release partition ownership. 4. rebalance, add topic, partition and fetcher queues to the topic registry, for all topics that the consumer process currently wants to own. 5. If the consumer runs into conflict for one topic or partition, the rebalancing attempt fails, and it goes to step 1. Say, there are 2 consumers in a group, c1 and c2. Both are consuming topic1 with partitions 0-0, 0-1 and 1-0. Say c1 owns 0-0 and 0-1 and c2 owns 1-0. 1. Broker 1 goes down. This triggers rebalancing attempt in c1 and c2. 2. c1's release partition ownership and during step 4 (above), fails to rebalance. 3. Meanwhile, c2 completes rebalancing successfully, and owns partition 0-1 and starts consuming data. 4. c1 starts next rebalancing attempt and during step 3 (above), it releases partition 0-1. During step 4, it owns partition 0-0 again, and starts consuming data. 5. Effectively, rebalancing has completed successfully, but there is no owner for partition 0-1 registered in Zookeeper. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-262) Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own
[ https://issues.apache.org/jira/browse/KAFKA-262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13204140#comment-13204140 ] Jun Rao commented on KAFKA-262: --- 2. If releasePartitionOwnership always just checks topicRegistry, the code and the logic will be a bit simpler. Could we run the system test and see if there is any issue with the simplification? Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own Key: KAFKA-262 URL: https://issues.apache.org/jira/browse/KAFKA-262 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7 Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.7.1 Attachments: kafka-262.patch Original Estimate: 24h Remaining Estimate: 24h The consumer maintains a cache of topics and partitions it owns along with the fetcher queues corresponding to those. But while releasing partition ownership, this cache is not cleared. This leads the consumer to release a partition that it does not own any more. This can also lead the consumer to commit offsets for partitions that it no longer consumes from. The rebalance operation goes through following steps - 1. close fetchers 2. commit offsets 3. release partition ownership. 4. rebalance, add topic, partition and fetcher queues to the topic registry, for all topics that the consumer process currently wants to own. 5. If the consumer runs into conflict for one topic or partition, the rebalancing attempt fails, and it goes to step 1. Say, there are 2 consumers in a group, c1 and c2. Both are consuming topic1 with partitions 0-0, 0-1 and 1-0. Say c1 owns 0-0 and 0-1 and c2 owns 1-0. 1. Broker 1 goes down. This triggers rebalancing attempt in c1 and c2. 2. c1's release partition ownership and during step 4 (above), fails to rebalance. 3. Meanwhile, c2 completes rebalancing successfully, and owns partition 0-1 and starts consuming data. 4. c1 starts next rebalancing attempt and during step 3 (above), it releases partition 0-1. During step 4, it owns partition 0-0 again, and starts consuming data. 5. Effectively, rebalancing has completed successfully, but there is no owner for partition 0-1 registered in Zookeeper. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-240) implement new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13204170#comment-13204170 ] Jun Rao commented on KAFKA-240: --- Prashanth, thanks for the patch. Looks good overall. 1. FetchResponse: Since we already have error code at the PartitionData level, should we get rid of the error code at the FetchResponse level? There is no obvious use case for the latter. 2. Should FetchResponse.messageSet return ByteBufferMessageSet? That way caller doesn't have to cast. 3. Could you rebase to the latest 0.8 branch? Joe, do you think we can take Prashanth's patch first and apply your patch on top later? implement new producer and consumer request format -- Key: KAFKA-240 URL: https://issues.apache.org/jira/browse/KAFKA-240 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8 Reporter: Jun Rao Labels: fetch, replication, wireprotocol Fix For: 0.8 Attachments: KAFKA-240-FetchRequest-v1.patch We want to change the producer/consumer request/response format according to the discussion in the following wiki: https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-262) Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own
[ https://issues.apache.org/jira/browse/KAFKA-262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13204203#comment-13204203 ] Jun Rao commented on KAFKA-262: --- In releasePartitionOwnership(), topicAndPartitionsToBeReleased is no longer used and should be removed. Otherwise, the patch looks good. Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own Key: KAFKA-262 URL: https://issues.apache.org/jira/browse/KAFKA-262 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7 Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.7.1 Attachments: kafka-262-v3.patch, kafka-262.patch Original Estimate: 24h Remaining Estimate: 24h The consumer maintains a cache of topics and partitions it owns along with the fetcher queues corresponding to those. But while releasing partition ownership, this cache is not cleared. This leads the consumer to release a partition that it does not own any more. This can also lead the consumer to commit offsets for partitions that it no longer consumes from. The rebalance operation goes through following steps - 1. close fetchers 2. commit offsets 3. release partition ownership. 4. rebalance, add topic, partition and fetcher queues to the topic registry, for all topics that the consumer process currently wants to own. 5. If the consumer runs into conflict for one topic or partition, the rebalancing attempt fails, and it goes to step 1. Say, there are 2 consumers in a group, c1 and c2. Both are consuming topic1 with partitions 0-0, 0-1 and 1-0. Say c1 owns 0-0 and 0-1 and c2 owns 1-0. 1. Broker 1 goes down. This triggers rebalancing attempt in c1 and c2. 2. c1's release partition ownership and during step 4 (above), fails to rebalance. 3. Meanwhile, c2 completes rebalancing successfully, and owns partition 0-1 and starts consuming data. 4. c1 starts next rebalancing attempt and during step 3 (above), it releases partition 0-1. During step 4, it owns partition 0-0 again, and starts consuming data. 5. Effectively, rebalancing has completed successfully, but there is no owner for partition 0-1 registered in Zookeeper. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-240) implement new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13204258#comment-13204258 ] Jun Rao commented on KAFKA-240: --- Also, for the builder for FetchRequest. My concern is that it's not clear what the required parameters are. implement new producer and consumer request format -- Key: KAFKA-240 URL: https://issues.apache.org/jira/browse/KAFKA-240 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8 Reporter: Jun Rao Labels: fetch, replication, wireprotocol Fix For: 0.8 Attachments: KAFKA-240-FetchRequest-v1.patch We want to change the producer/consumer request/response format according to the discussion in the following wiki: https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-253) Refactor the async producer to have only one queue instead of one queue per broker in a Kafka cluster
[ https://issues.apache.org/jira/browse/KAFKA-253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13203209#comment-13203209 ] Jun Rao commented on KAFKA-253: --- Thanks for the review. Committed to 0.8 branch. Will close the jira once it's committed to 0.7 branch. Refactor the async producer to have only one queue instead of one queue per broker in a Kafka cluster - Key: KAFKA-253 URL: https://issues.apache.org/jira/browse/KAFKA-253 Project: Kafka Issue Type: Improvement Affects Versions: 0.8 Reporter: Neha Narkhede Assignee: Jun Rao Attachments: kafka-253.patch, kafka-253_v2.patch, kafka-253_v3.patch, kafka-253_v4.patch, kafka-253_v5.patch Original Estimate: 168h Remaining Estimate: 168h Today, the async producer is associated with a particular broker instance, just like the SyncProducer. The Producer maintains a producer pool of sync/async producers, one per broker. Since the producer pool creates one async producer per broker, we have multiple producer queues for one Producer instance. With replication, a topic partition will be logical. This requires refactoring the AsyncProducer to be broker agnostic. As a side effect of this refactoring, we should also ensure that we have only one queue per Producer instance. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-240) implement new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13199901#comment-13199901 ] Jun Rao commented on KAFKA-240: --- Joe, Prashanth, Do you guys have patches that we can review now? We are getting very close to start using the new request format in 0.8. Thanks, implement new producer and consumer request format -- Key: KAFKA-240 URL: https://issues.apache.org/jira/browse/KAFKA-240 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao Fix For: 0.8 We want to change the producer/consumer request/response format according to the discussion in the following wiki: https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-251) The ConsumerStats MBean's PartOwnerStats attribute is a string
[ https://issues.apache.org/jira/browse/KAFKA-251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13199447#comment-13199447 ] Jun Rao commented on KAFKA-251: --- Sorry, just get the chance to look at this. Couldn't apply the patch since trunk has moved. Could you rebase? The ConsumerStats MBean's PartOwnerStats attribute is a string --- Key: KAFKA-251 URL: https://issues.apache.org/jira/browse/KAFKA-251 Project: Kafka Issue Type: Bug Reporter: Pierre-Yves Ritschard Attachments: 0001-Incorporate-Jun-Rao-s-comments-on-KAFKA-251.patch, 0001-Provide-a-patch-for-KAFKA-251.patch The fact that the PartOwnerStats is a string prevents monitoring systems from graphing consumer lag. There should be one mbean per [ topic, partition, groupid ] group. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-254) A tool to GET Zookeeper partition-offset and output to files
[ https://issues.apache.org/jira/browse/KAFKA-254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13199458#comment-13199458 ] Jun Rao commented on KAFKA-254: --- Thanks for the patch. A few comments: 1. For consistency, let's use joptsimple for command line processing (take a look at some other tools as examples). 2. We should use ZkClient, instead of the raw ZK client (see ConsumerOffsetChecker) 3. Could we extend this to support multiple consumer groups? We can use an option --groups to specify all groups. If the option is not specified, we can get all groups registered in ZK. We will need to include group name in the output file. 4. Use the zk common paths already defined in ZkUtils. Those comments are applicable to kafka-255 too. A tool to GET Zookeeper partition-offset and output to files Key: KAFKA-254 URL: https://issues.apache.org/jira/browse/KAFKA-254 Project: Kafka Issue Type: Task Components: clients Reporter: John Fung Assignee: John Fung Attachments: kafka-254-v1.patch A utility that retrieves the offsets of all topic partitions in ZK for a specified group id and save the data to output files. A shell script also comes with this tool to automate the writing of data to files in a specified time interval. This utility expects 3 arguments: 1. Zk host:port string 2. group Id 3. Output file pathname -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer
[ https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13192287#comment-13192287 ] Jun Rao commented on KAFKA-238: --- 3. I agree that the API is rarely used. However, each call could request a large number of topics. This is particularly true in the consumer where we make multifetch requests (may consist of hundreds of topics). I wasn't thinking of caching broker info permanently. Rather, we can just cache the broker info within a single getMetaData request. For example, if we have 200 topics (1 partition per topic) and 4 brokers, instead of making 200 ZK read requests, we just need to make 4 requests. 4. I am ok with tracking this in a separate jira. Some new comments: 6. Remove unused imported package in AdminUtils, SyncProducerTest, etc. 7. TopicMetadataRequest.deserializeTopicMetadata should be sth like deserializeTopicMetadataList 8. TopicMetaData: keep comments consistent with implementation (e.g., doesLeaderExist is 1 byte now) 9. ZkUtils: we have a mix of getTopicPartition* and getTopicPart*. We can use either format. We just need to be consistent. add a getTopicMetaData method in broker and expose it to producer -- Key: KAFKA-238 URL: https://issues.apache.org/jira/browse/KAFKA-238 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao Assignee: Neha Narkhede Attachments: kafka-238-v1.patch, kafka-238-v2.patch We need a way to propagate the leader and the partition information to the producer so that it can do load balancing and semantic partitioning. One way to do that is to have the producer get the information from ZK directly. This means that the producer needs to maintain a ZK session and has to subscribe to watchers, which can be complicated. An alternative approach is to have the following api on the broker. TopicMetaData getTopicMetaData(String: topic) TopicMetaData { Array[PartitionMetaData]: partitionsMetaData } PartitionMetaData { Int: partitionId String: leaderHostname Int: leaderPort } Using this api, the producer can get the metadata about a topic during initial startup or leadership change of a partition. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer
[ https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13192450#comment-13192450 ] Jun Rao commented on KAFKA-238: --- +1 on the patch. This patch doesn't wire the getMetaData api in the producer/consumer. Is the intention to do that as part of kafka-239 ? add a getTopicMetaData method in broker and expose it to producer -- Key: KAFKA-238 URL: https://issues.apache.org/jira/browse/KAFKA-238 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao Assignee: Neha Narkhede Attachments: kafka-238-v1.patch, kafka-238-v2.patch, kafka-238-v3.patch We need a way to propagate the leader and the partition information to the producer so that it can do load balancing and semantic partitioning. One way to do that is to have the producer get the information from ZK directly. This means that the producer needs to maintain a ZK session and has to subscribe to watchers, which can be complicated. An alternative approach is to have the following api on the broker. TopicMetaData getTopicMetaData(String: topic) TopicMetaData { Array[PartitionMetaData]: partitionsMetaData } PartitionMetaData { Int: partitionId String: leaderHostname Int: leaderPort } Using this api, the producer can get the metadata about a topic during initial startup or leadership change of a partition. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-251) The ConsumerStats MBean's PartOwnerStats attribute is a string
[ https://issues.apache.org/jira/browse/KAFKA-251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13192736#comment-13192736 ] Jun Rao commented on KAFKA-251: --- Thanks for the patch. Some comments: 1. There is compilation error. 2. Use space instead of TAB. 3. The new jmx seems to be intended for replacing ZookeeperConsumerConnectorMBean. If so, we should remove ZookeeperConsumerConnectMBean and the implementation in ZookeeperConsumerConnector. 4. Not every SyncRebalance triggers a successful rebalance (sometimes rebalance is not necessary). So, instead of unregistering/registering the MBean in SyncRebalance, we should do unregisterMBeans followed by registerMBeans at the end of updateFetcher, which is when a rebalance really happens. The ConsumerStats MBean's PartOwnerStats attribute is a string --- Key: KAFKA-251 URL: https://issues.apache.org/jira/browse/KAFKA-251 Project: Kafka Issue Type: Bug Reporter: Pierre-Yves Ritschard Attachments: 0001-Provide-a-patch-for-KAFKA-251.patch The fact that the PartOwnerStats is a string prevents monitoring systems from graphing consumer lag. There should be one mbean per [ topic, partition, groupid ] group. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer
[ https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13191262#comment-13191262 ] Jun Rao commented on KAFKA-238: --- The patch doesn't apply to the 0.8 branch. Could you rebase? can't find file to patch at input line 2260 Perhaps you used the wrong -p or --strip option? The text leading up to this was: -- |Index: core/src/main/scala/kafka/api/MultiMessageSetSend.scala |=== |--- core/src/main/scala/kafka/api/MultiMessageSetSend.scala(revision 1232541) |+++ core/src/main/scala/kafka/api/MultiMessageSetSend.scala(working copy) add a getTopicMetaData method in broker and expose it to producer -- Key: KAFKA-238 URL: https://issues.apache.org/jira/browse/KAFKA-238 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao Assignee: Neha Narkhede Attachments: kafka-238-v1.patch We need a way to propagate the leader and the partition information to the producer so that it can do load balancing and semantic partitioning. One way to do that is to have the producer get the information from ZK directly. This means that the producer needs to maintain a ZK session and has to subscribe to watchers, which can be complicated. An alternative approach is to have the following api on the broker. TopicMetaData getTopicMetaData(String: topic) TopicMetaData { Array[PartitionMetaData]: partitionsMetaData } PartitionMetaData { Int: partitionId String: leaderHostname Int: leaderPort } Using this api, the producer can get the metadata about a topic during initial startup or leadership change of a partition. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer
[ https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13191350#comment-13191350 ] Jun Rao commented on KAFKA-238: --- Some comments: 1. Use the new Logger helper for new log4j logging. 2. PartitionMetaData: The leader indicator just needs 1 byte, instead of 2. 3. AdminUtils.getTopicMetaDataFromZK(): Instead of getting broker info from ZK each time, could we collect a set of unique broker ids that are needed and call ZK only once for each unique id to get the broker info? 3. LogMetaData: LogMetaData is actually a replica level info, not a partition level info. Collecting such data in the same getGetMetaData api is tricky since log meta data is only available at the broker that stores a replica locally. So, this info is not available at every broker, you have to talk to the right broker to get such information. One possibility is to keep this as a separate api, something like getReplicaMetadata(topic, partition, brokerid). Such information is only returned if the api is called on the broker that owns the partition. 4. SyncProducerTest: no need to extend ZooKeeperTestHarness since it's included in KafkaServerTestHarness now. 5. We probably need to separate ZK from LogManager. This allows us to use and test LogManager independently. We can probably attach a handler to the LogManager. This can be a separate jira. add a getTopicMetaData method in broker and expose it to producer -- Key: KAFKA-238 URL: https://issues.apache.org/jira/browse/KAFKA-238 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao Assignee: Neha Narkhede Attachments: kafka-238-v1.patch We need a way to propagate the leader and the partition information to the producer so that it can do load balancing and semantic partitioning. One way to do that is to have the producer get the information from ZK directly. This means that the producer needs to maintain a ZK session and has to subscribe to watchers, which can be complicated. An alternative approach is to have the following api on the broker. TopicMetaData getTopicMetaData(String: topic) TopicMetaData { Array[PartitionMetaData]: partitionsMetaData } PartitionMetaData { Int: partitionId String: leaderHostname Int: leaderPort } Using this api, the producer can get the metadata about a topic during initial startup or leadership change of a partition. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-41) multi-produce and multi-fetch support with replication
[ https://issues.apache.org/jira/browse/KAFKA-41?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13191446#comment-13191446 ] Jun Rao commented on KAFKA-41: -- Yes, this may be covered by the new wire protocol change and KAFKA-239. multi-produce and multi-fetch support with replication -- Key: KAFKA-41 URL: https://issues.apache.org/jira/browse/KAFKA-41 Project: Kafka Issue Type: Bug Reporter: Jun Rao We need to figure out how to support multi-produce and multi-fetch smoothly with the replication support. The client has to figure out which partitions are collocated on the same broker and adjust accordingly when some partitions are moved. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-240) implement new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13191471#comment-13191471 ] Jun Rao commented on KAFKA-240: --- Joe, The changes in github make sense. A couple of suggestions: 1. We can probably use case class for ProduceRequest. 2. The return type of produce request handler should be Option[ProduceResponse] since the response is optional. implement new producer and consumer request format -- Key: KAFKA-240 URL: https://issues.apache.org/jira/browse/KAFKA-240 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao Fix For: 0.8 We want to change the producer/consumer request/response format according to the discussion in the following wiki: https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-240) implement new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13188716#comment-13188716 ] Jun Rao commented on KAFKA-240: --- Joe, I think it could be simpler than that. You can have the following encoding: # of topics (2 bytes) topic1 encoding (see encoding format below) ... topicN encoding Topic Encoding: topic name length (2 bytes) topic name bytes # of partitions (2 bytes) partition1 encoding (see encoding format below) partitionN encoding Partition encoding: partition id: (4 bytes) messageSet encoding implement new producer and consumer request format -- Key: KAFKA-240 URL: https://issues.apache.org/jira/browse/KAFKA-240 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao Fix For: 0.8 We want to change the producer/consumer request/response format according to the discussion in the following wiki: https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-245) upgrade to zkclient 0.1
[ https://issues.apache.org/jira/browse/KAFKA-245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13185177#comment-13185177 ] Jun Rao commented on KAFKA-245: --- Do we know the github revision/date corresponding to the zkclient 0.1 release? upgrade to zkclient 0.1 --- Key: KAFKA-245 URL: https://issues.apache.org/jira/browse/KAFKA-245 Project: Kafka Issue Type: New Feature Components: core Affects Versions: 0.7 Reporter: Pierre-Yves Ritschard Assignee: Pierre-Yves Ritschard Labels: newbie Attachments: 0001-KAFKA-245-zkclient-0.1-to-enable-maven-syncing.patch, 0003-follow-up-to-KAFKA-245-use-maven-to-fetch-zkclient.patch the zkclient jar bundled with kafka should be synced with what is available on maven central. the artifact which has group com.github.sgroschupf, artifact id zkclient and version 0.1 is from a day after the one bundled with kafka and should thus be sufficient for kafka's needs. I have tested it locally and find no regressions. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-232) ConsumerConnector has no access to getOffsetsBefore
[ https://issues.apache.org/jira/browse/KAFKA-232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13185190#comment-13185190 ] Jun Rao commented on KAFKA-232: --- We discussed in the mailing before about allowing the ZK-based consumer to consume from an arbitrary offset during startup time. Here is the main complexity. Multiple consumers in the same group can consume a topic jointly. When they start up, which consumer sets the offset for which partitions? How do we prevent 2 consumers from setting the offset for the same partition? ConsumerConnector has no access to getOffsetsBefore -- Key: KAFKA-232 URL: https://issues.apache.org/jira/browse/KAFKA-232 Project: Kafka Issue Type: New Feature Reporter: Edward Capriolo Priority: Minor kafka.javaapi.SimpleConsumer has getOffsetsBefore. I would like this ability in KafkaMessageStream or in ConsumerConnector. In this way clients can access their current position. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-240) implement new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13184455#comment-13184455 ] Jun Rao commented on KAFKA-240: --- Joe, That's a good question. We had some discussions on this in the mailing list before. I agree it would be nice if we can make the 0.8 release backward compatible. However, it's a little bit hard because: 1. We need a process to migrate the ZK data structures and potentially on-disk data organization. This is particular hard since we need to migrate a whole kafka cluster online. In the middle of the migration, some brokers will be using the old structures and some other brokers will be using the new structures. It's not clear what a consumer should behave in this stage. 2. If something terribly wrong happens during the migration, there is no easy way to rollback the migration. An alternative is to make the 0.8 release non-backward compatible. This allows us to incorporate any wire/on-disk changes freely. To upgrade from 0.7 to 0.8, one possibility is to start a new 0.8 cluster. We can provide a tool that continuously mirrors data from the 0.7 cluster to the new 0.8 cluster. Once that's done, we can first upgrade the consumers and point them to the 0.8 cluster, followed by upgrading the producers and pointing them to the 0.8 cluster. This is somewhat more operational work. However, it addresses both of the above issues. implement new producer and consumer request format -- Key: KAFKA-240 URL: https://issues.apache.org/jira/browse/KAFKA-240 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao Fix For: 0.8 We want to change the producer/consumer request/response format according to the discussion in the following wiki: https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-202) Make the request processing in kafka asynchonous
[ https://issues.apache.org/jira/browse/KAFKA-202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13184626#comment-13184626 ] Jun Rao commented on KAFKA-202: --- unit tests now pass. +1 on the patch. We should probably commit this patch to an 0.8 branch. Make the request processing in kafka asynchonous Key: KAFKA-202 URL: https://issues.apache.org/jira/browse/KAFKA-202 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-202-v2.patch, KAFKA-202-v3.patch, KAFKA-202-v4.patch, KAFKA-202-v5.patch, KAFKA-202-v6.patch, KAFKA-48-socket-server-refactor-draft.patch We need to handle long-lived requests to support replication. To make this work we need to make the processing mechanism asynchronous from the network threads. To accomplish this we will retain the existing pool of network threads but add a new pool of request handling threads. These will do all the disk I/O. There will be a queue mechanism to transfer requests to and from this secondary pool. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-228) Reduce duplicate messages served by the kafka consumer for uncompressed topics to 0
[ https://issues.apache.org/jira/browse/KAFKA-228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13181491#comment-13181491 ] Jun Rao commented on KAFKA-228: --- Thanks for the new patch. Overall, looks pretty good. A few extra comments: 4. ConsumerIterator: Could we make currentDataChunk a local variable in makeNext. This is an existing problem, but it would be good if we can fix it in this patch. Also, currentTopicInfo doesn't have to atomic since it's never updated concurrently. 5. It's better to set the KafkaMessageStreams in the constructor of ZKRebalanceListener. This way, even if the ZK listener gets triggered before the ZookeeperConsumerConnector.consume completes, KafkaMessageStreams is available in the rebalance call in the listener. 6. Fetcher: remove shutdown and keep only stopConnectionsToAllBrokers Reduce duplicate messages served by the kafka consumer for uncompressed topics to 0 --- Key: KAFKA-228 URL: https://issues.apache.org/jira/browse/KAFKA-228 Project: Kafka Issue Type: Improvement Affects Versions: 0.7 Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.7.1 Attachments: kafka-228-v3.patch, kafka-228.patch, kafka-228_v2.patch Kafka guarantees at-least once delivery of messages.The high level consumer provides highly available partitioned consumption of data within the same consumer group. In the event of broker failures or consumer failures within a group, the high level consumer rebalances and redistributes the topic partitions evenly amongst the consumers in a group. With the current design, during this rebalancing operation, Kafka introduces duplicates in the consumed data. This JIRA improves the rebalancing operation and the consumer iterator design to guarantee 0 duplicates while consuming uncompressed topics. There will be a small number of duplicates while serving compressed data, but it will be bound by the compression batch size. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-236) Make 'autooffset.reset' accept a delay in addition to {smallest,largest}
[ https://issues.apache.org/jira/browse/KAFKA-236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13180528#comment-13180528 ] Jun Rao commented on KAFKA-236: --- To reset the offset, under the cover, we use getLastestOffsetBefore(), which supports an arbitrary starting timestamp, in addition to smallest and largest. However, getLastestOffsetBefore has a very coarse granularity at this moment. It only returns the offset of the first message in a log segment file based on its last modified time. So, depending on the log segment size, this may or may not be good enough for some applications. Make 'autooffset.reset' accept a delay in addition to {smallest,largest} Key: KAFKA-236 URL: https://issues.apache.org/jira/browse/KAFKA-236 Project: Kafka Issue Type: Improvement Reporter: Mathias Herberts Add the possibilty to specify a delay in ms which would be used when resetting offset. This would allow for example a client to specify it would like its offset to be reset to the first offset before/after the current time - the given offset. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer
[ https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13180677#comment-13180677 ] Jun Rao commented on KAFKA-238: --- We can potentially piggy-back auto topic creation on the getTopicMetaData api. If auto topic creation is enabled and the broker sees no ZK data for a topic, the api will create the topic first. add a getTopicMetaData method in broker and expose it to producer -- Key: KAFKA-238 URL: https://issues.apache.org/jira/browse/KAFKA-238 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao We need a way to propagate the leader and the partition information to the producer so that it can do load balancing and semantic partitioning. One way to do that is to have the producer get the information from ZK directly. This means that the producer needs to maintain a ZK session and has to subscribe to watchers, which can be complicated. An alternative approach is to have the following api on the broker. TopicMetaData getTopicMetaData(String: topic) TopicMetaData { Array[PartitionMetaData]: partitionsMetaData } PartitionMetaData { Int: partitionId String: leaderHostname Int: leaderPort } Using this api, the producer can get the metadata about a topic during initial startup or leadership change of a partition. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-232) ConsumerConnector has no access to getOffsetsBefore
[ https://issues.apache.org/jira/browse/KAFKA-232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13180841#comment-13180841 ] Jun Rao commented on KAFKA-232: --- Do you think you can add meta data in the message itself so that you know whether the consumed data has moved to the next minute? If so, you can turn off auto commit and call commitOffset() directly. ConsumerConnector has no access to getOffsetsBefore -- Key: KAFKA-232 URL: https://issues.apache.org/jira/browse/KAFKA-232 Project: Kafka Issue Type: New Feature Reporter: Edward Capriolo Priority: Minor kafka.javaapi.SimpleConsumer has getOffsetsBefore. I would like this ability in KafkaMessageStream or in ConsumerConnector. In this way clients can access their current position. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer
[ https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13180855#comment-13180855 ] Jun Rao commented on KAFKA-238: --- I can see it's useful for the api to support a list of topics. Returning all topics is probably too much overhead since most producers are interested in a subset of topics. In our current replication design, both send and fetch requests are served only by the leader. We can balance the load by having multiple partitions per broker. We may allow followers to serve read in the future. I don't see a clear benefit at this moment though. add a getTopicMetaData method in broker and expose it to producer -- Key: KAFKA-238 URL: https://issues.apache.org/jira/browse/KAFKA-238 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao We need a way to propagate the leader and the partition information to the producer so that it can do load balancing and semantic partitioning. One way to do that is to have the producer get the information from ZK directly. This means that the producer needs to maintain a ZK session and has to subscribe to watchers, which can be complicated. An alternative approach is to have the following api on the broker. TopicMetaData getTopicMetaData(String: topic) TopicMetaData { Array[PartitionMetaData]: partitionsMetaData } PartitionMetaData { Int: partitionId String: leaderHostname Int: leaderPort } Using this api, the producer can get the metadata about a topic during initial startup or leadership change of a partition. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-202) Make the request processing in kafka asynchonous
[ https://issues.apache.org/jira/browse/KAFKA-202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13181141#comment-13181141 ] Jun Rao commented on KAFKA-202: --- Overall, the patch looks good. Some comments: 1. KafkaServer.startup doesn't have to capture exception and shutdown. The caller in KafkaServerStarable already does that. Plus, it shuts down embedded consumer appropriately if needed. 2. There is KafkaRequestHandlers.scala.rej in the patch. 3. Unit test seems to fail occasionally, giving the following error. [info] == core-kafka / kafka.integration.LazyInitProducerTest == [2012-01-05 21:57:38,773] ERROR Error processing MultiProducerRequest on test:0 (kafka.server.KafkaApis:82) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75) at kafka.message.FileMessageSet.append(FileMessageSet.scala:161) at kafka.log.Log.append(Log.scala:215) at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71) at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64) at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.server.KafkaApis.handleMultiProducerRequest(KafkaApis.scala:64) at kafka.server.KafkaApis.handle(KafkaApis.scala:43) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35) at java.lang.Thread.run(Thread.java:662) [2012-01-05 21:57:38,773] ERROR Error processing ProduceRequest on test:0 (kafka.server.KafkaApis:82) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75) at kafka.message.FileMessageSet.append(FileMessageSet.scala:161) at kafka.log.Log.append(Log.scala:215) at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55) at kafka.server.KafkaApis.handle(KafkaApis.scala:40) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35) at java.lang.Thread.run(Thread.java:662) [2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while handling producer request: null (kafka.server.KafkaApis:92) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75) at kafka.message.FileMessageSet.append(FileMessageSet.scala:161) at kafka.log.Log.append(Log.scala:215) at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55) at kafka.server.KafkaApis.handle(KafkaApis.scala:40) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35) at java.lang.Thread.run(Thread.java:662) [2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while handling producer request: null (kafka.server.KafkaApis:92) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75) at kafka.message.FileMessageSet.append(FileMessageSet.scala:161) at
[jira] [Commented] (KAFKA-200) Support configurable send / receive socket buffer size in server
[ https://issues.apache.org/jira/browse/KAFKA-200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13179977#comment-13179977 ] Jun Rao commented on KAFKA-200: --- John, Thanks for the detailed update. From TCP illustrated section 13.3.3, last paragraph: The shift count is automatically chosen by TCP, based on the size of the receive buffer. So, to set a TCP window size larger than 64K, we just need to make sure the receive buffer size is set properly before the connection is established. So, Approach 3 as you listed is the right thing to do. Could you upload a new patch? Support configurable send / receive socket buffer size in server Key: KAFKA-200 URL: https://issues.apache.org/jira/browse/KAFKA-200 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: John Fung Fix For: 0.8 Attachments: KAFKA-200.patch * Make the send / receive socket buffer size configurable in server. * KafkaConfig.scala already has the following existing variables to support send / receive buffer: socketSendBuffer socketReceiveBuffer * The patch attached to this ticket will read the following existing settings in kafka/config/server.properties and set the corresponding socket buffers . . . # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer=1048576 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer=1048576 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-232) ConsumerConnector has no access to getOffsetsBefore
[ https://issues.apache.org/jira/browse/KAFKA-232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13179211#comment-13179211 ] Jun Rao commented on KAFKA-232: --- Could you explain in a bit more detail how you plan to use this method? Today, ConsumerConnector doesn't support consuming from an arbitrary offset. ConsumerConnector has no access to getOffsetsBefore -- Key: KAFKA-232 URL: https://issues.apache.org/jira/browse/KAFKA-232 Project: Kafka Issue Type: New Feature Reporter: Edward Capriolo Priority: Minor kafka.javaapi.SimpleConsumer has getOffsetsBefore. I would like this ability in KafkaMessageStream or in ConsumerConnector. In this way clients can access their current position. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-48) Implement optional long poll support in fetch request
[ https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13178466#comment-13178466 ] Jun Rao commented on KAFKA-48: -- Taylor, Sorry for the late response. I am not sure that I understand your proposal. a. Why do we need a local socket? It seems that the same thing can be achieved by just turning on the write_interesting bit in the socket key corresponding to a client request. b. It's not clear to me how you correlate a queued client request with the corresponding client socket. Implement optional long poll support in fetch request --- Key: KAFKA-48 URL: https://issues.apache.org/jira/browse/KAFKA-48 Project: Kafka Issue Type: Bug Reporter: Alan Cabrera Assignee: Jay Kreps Currently, the fetch request is non-blocking. If there is nothing on the broker for the consumer to retrieve, the broker simply returns an empty set to the consumer. This can be inefficient, if you want to ensure low-latency because you keep polling over and over. We should make a blocking version of the fetch request so that the fetch request is not returned until the broker has at least one message for the fetcher or some timeout passes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-228) Reduce duplicate messages served by the kafka consumer for uncompressed topics to 0
[ https://issues.apache.org/jira/browse/KAFKA-228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13172441#comment-13172441 ] Jun Rao commented on KAFKA-228: --- Thanks for the patch. Some comments below: 1. This jira is a duplicate of kafka-198. We need to mark it. 2. ZookeeperConsumerConnector 2.1 commitOffsets(): the logging committing all offsets should be in trace or debug level 2.2 The patch commits all offsets for clearing every stream. This is unnecessary. We just need to commit all offsets once after we cleared all streams. Also, we don't need to clear all streams. We only need to clear a stream whose fetch queue needs to be cleared. Here is one way of doing that. We keep a reference to stream in each fetch queue (there is a 1:1 mapping btw stream and fetch queue). In closeFetchers(), we (1) stop the current fetcher; (2) for each fetch queue to be cleared: clear the queue and clear the corresponding stream; (3) commit all offsets. Then, we don't need Fetcher.clearFetcherQueues and we don't need to pass in kafkaMessageStreams in Fetcher.startConnections. Also, ConsumerIterator.clearCurrentChunk doesn't need to take any input parameters. 3. ConsumerIterator: The logic is a bit complicated and I am not sure if it's really necessary. To me, it seems all we need is to make currentDataChunk an AtomticReference. clearCurrentChunk() simply sets the reference to null. This will be the only synchronization that we need (no need for lock). Because of the ordering in 2.2, this will make sure the next hasNext() call on the stream blocks until the Fetcher is started again. Reduce duplicate messages served by the kafka consumer for uncompressed topics to 0 --- Key: KAFKA-228 URL: https://issues.apache.org/jira/browse/KAFKA-228 Project: Kafka Issue Type: Improvement Affects Versions: 0.7 Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.7.1 Attachments: kafka-228.patch Kafka guarantees at-least once delivery of messages.The high level consumer provides highly available partitioned consumption of data within the same consumer group. In the event of broker failures or consumer failures within a group, the high level consumer rebalances and redistributes the topic partitions evenly amongst the consumers in a group. With the current design, during this rebalancing operation, Kafka introduces duplicates in the consumed data. This JIRA improves the rebalancing operation and the consumer iterator design to guarantee 0 duplicates while consuming uncompressed topics. There will be a small number of duplicates while serving compressed data, but it will be bound by the compression batch size. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-222) Mavenize contrib
[ https://issues.apache.org/jira/browse/KAFKA-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13163235#comment-13163235 ] Jun Rao commented on KAFKA-222: --- +1 Mavenize contrib Key: KAFKA-222 URL: https://issues.apache.org/jira/browse/KAFKA-222 Project: Kafka Issue Type: Task Components: contrib Reporter: Neha Narkhede Assignee: Neha Narkhede Priority: Blocker Fix For: 0.7 Attachments: kafka-222.patch, rm_jars.sh To reduce the overhead of maintaining the NOTICE and LICENSE files, we need to mavenize most checked-in jars. Out of the current checked-in jars, the following ones cannot be mavenized - 1. lib/sbt-launch.jar: This is required to fire up the build system of Kafka (SBT) 2. core/lib/zkclient-20110412.jar: Kafka uses a patched zkclient jar 3. contrib/hadoop-consumer/lib/piggybank.jar: This is not available through Maven 4. contrib/hadoop-consumer/lib/pig-0.8.0-core.jar: This is also not available through Maven -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-212) IllegalThreadStateException in topic watcher for Kafka mirroring
[ https://issues.apache.org/jira/browse/KAFKA-212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13160491#comment-13160491 ] Jun Rao commented on KAFKA-212: --- We shouldn't be leaking threads. If we can get to the code that creates new MirrorThreads, the old threads should have finished since shutdown is blocking. If the shutdown blocks forever, we won't be able to create new threads. Again, there is no thread leak. Although the latter would suggest another serious bug somewhere else. +1 on the patch. IllegalThreadStateException in topic watcher for Kafka mirroring Key: KAFKA-212 URL: https://issues.apache.org/jira/browse/KAFKA-212 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.7.1 Attachments: KAFKA-212.patch If the kafka mirroring embedded consumer receives a new topic watcher notification, it runs into the following exception [2011-11-23 02:49:15,612] FATAL java.lang.IllegalThreadStateException (kafka.consumer.ZookeeperTopicEventWatcher) [2011-11-23 02:49:15,612] FATAL java.lang.IllegalThreadStateException at java.lang.Thread.start(Thread.java:595) at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$3.apply(KafkaServerStartable.scala:142) at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$3.apply(KafkaServerStartable.scala:142) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.server.EmbeddedConsumer.startNewConsumerThreads(KafkaServerStartable.scala:142) at kafka.server.EmbeddedConsumer.handleTopicEvent(KafkaServerStartable.scala:109) at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree2$1(ZookeeperTopicEventWatcher.scala:83) at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:78) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) (kafka.consumer.ZookeeperTopicEventWatcher) This happens since it tries to start a thread which has finished executing -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-210) javaapi ZookeeperConsumerConnectorTest duplicates many tests in the scala version
[ https://issues.apache.org/jira/browse/KAFKA-210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13155956#comment-13155956 ] Jun Rao commented on KAFKA-210: --- Yes, I am saying that there is no need to stress test the javaapi since it's just a wrapper. All we need to test is that the java - scala conversion works. So one basic test should be enough for this purpose. javaapi ZookeeperConsumerConnectorTest duplicates many tests in the scala version - Key: KAFKA-210 URL: https://issues.apache.org/jira/browse/KAFKA-210 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: Jun Rao Assignee: Jun Rao Attachments: KAFKA-210.patch Since javaapi.ZookeeperConsumerConnector is just a thin wrapper over the scala version, we only need to test the basic functionality there. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-204) BoundedByteBufferReceive hides OutOfMemoryError
[ https://issues.apache.org/jira/browse/KAFKA-204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13156061#comment-13156061 ] Jun Rao commented on KAFKA-204: --- Ok, I am fine with the patch then. Any objection to commit it? BoundedByteBufferReceive hides OutOfMemoryError --- Key: KAFKA-204 URL: https://issues.apache.org/jira/browse/KAFKA-204 Project: Kafka Issue Type: Bug Affects Versions: 0.7 Reporter: Chris Burroughs Assignee: Chris Burroughs Priority: Critical Attachments: k204-v1.txt private def byteBufferAllocate(size: Int): ByteBuffer = { var buffer: ByteBuffer = null try { buffer = ByteBuffer.allocate(size) } catch { case e: OutOfMemoryError = throw new RuntimeException(OOME with size + size, e) case e2 = throw e2 } buffer } This hides the fact that an Error occurred, and will likely result in some log handler printing a message, instead of exiting with non-zero status. Knowing how large the allocation was that caused an OOM is really nice, so I'd suggest logging in byteBufferAllocate and then re-throwing OutOfMemoryError -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-101) Avoid creating a new topic by the consumer
[ https://issues.apache.org/jira/browse/KAFKA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13156377#comment-13156377 ] Jun Rao commented on KAFKA-101: --- Taylor, this would suggest a synchronization issue during append to the log. Looking at the code, the append is always synchronized on the lock of the segment file. Could you reproduce the problem on 0.7? Avoid creating a new topic by the consumer -- Key: KAFKA-101 URL: https://issues.apache.org/jira/browse/KAFKA-101 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: Jun Rao Labels: patch Fix For: 0.7 Attachments: 0002-KAFKA-101-Avoid-creating-a-new-topic-by-the-consumer.patch.0.7, KAFKA-101-getoffsets.patch, KAFKA-101_v2.patch, KAFKA-101_v3.patch Currently, if a consumer consumes a topic and the topic doesn't exist, the topic is created automatically. Sometimes this can be confusing. Often, an ad hoc user may put a wrong topic name in the consumer and thus create many unnecessary topics. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.
[ https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13155240#comment-13155240 ] Jun Rao commented on KAFKA-49: -- Prshanth, thanks for getting started on this. I agree with your second approach. Basically, add a new parameter in SyncProducer.send/multisend to indicate whether an ack is needed or not. The high level producer can then set that parameter based on ProducerConfig (probably true for sync mode and false for async mode). Another question is what kind of ack does the broker send back. A simple approach is to send back a boolean. Another possibility is to return for each partition in the produce request, the latest offset after the request is served. Some clients could potentially make use of the returned offset. Add acknowledgement to the produce request. --- Key: KAFKA-49 URL: https://issues.apache.org/jira/browse/KAFKA-49 Project: Kafka Issue Type: Bug Reporter: Jun Rao Currently, the produce request doesn't get acknowledged. We need to have a broker send a response to the producer and have the producer wait for the response before sending the next request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-211) Fix LICENSE file to include MIT and SCALA license
[ https://issues.apache.org/jira/browse/KAFKA-211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13154690#comment-13154690 ] Jun Rao commented on KAFKA-211: --- +1 Fix LICENSE file to include MIT and SCALA license - Key: KAFKA-211 URL: https://issues.apache.org/jira/browse/KAFKA-211 Project: Kafka Issue Type: Task Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.7 Attachments: KAFKA-211.patch See here for reference - http://markmail.org/search/?q=kafka+0.7.0#query:kafka%200.7.0%20list%3Aorg.apache.incubator.general+page:1+mid:fiatvda3uyx2lbzb+state:results Looks like the LICENSE file should include MIT and SCALA license too -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-209) Remove empty directory when no log segments remain
[ https://issues.apache.org/jira/browse/KAFKA-209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13153918#comment-13153918 ] Jun Rao commented on KAFKA-209: --- Actually, when all log segments are deleted, we automatically create an empty log file whose name is the latest offset. So the log directory will never be empty. Remove empty directory when no log segments remain -- Key: KAFKA-209 URL: https://issues.apache.org/jira/browse/KAFKA-209 Project: Kafka Issue Type: Improvement Components: core Reporter: Taylor Gautier When the log cleaner runs it deletes segments from the log directory. However, if all the segments are cleaned, the log directory itself is retained. There doesn't seem to be any need for this as the directory will be re-created on write if necessary. This JIRA would improve the log cleaner to also remove unused topic directories from the log directory. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-204) BoundedByteBufferReceive hides OutOfMemoryError
[ https://issues.apache.org/jira/browse/KAFKA-204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13153919#comment-13153919 ] Jun Rao commented on KAFKA-204: --- Hmm, it doesn't look like OutOfMemoryError allow one to specify a cause during initialization. BoundedByteBufferReceive hides OutOfMemoryError --- Key: KAFKA-204 URL: https://issues.apache.org/jira/browse/KAFKA-204 Project: Kafka Issue Type: Bug Affects Versions: 0.7 Reporter: Chris Burroughs Assignee: Chris Burroughs Priority: Critical Attachments: k204-v1.txt private def byteBufferAllocate(size: Int): ByteBuffer = { var buffer: ByteBuffer = null try { buffer = ByteBuffer.allocate(size) } catch { case e: OutOfMemoryError = throw new RuntimeException(OOME with size + size, e) case e2 = throw e2 } buffer } This hides the fact that an Error occurred, and will likely result in some log handler printing a message, instead of exiting with non-zero status. Knowing how large the allocation was that caused an OOM is really nice, so I'd suggest logging in byteBufferAllocate and then re-throwing OutOfMemoryError -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-209) Remove empty directory when no log segments remain
[ https://issues.apache.org/jira/browse/KAFKA-209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13153987#comment-13153987 ] Jun Rao commented on KAFKA-209: --- It should behave that way in both 0.6 and 0.7. If not, it's a bug. Remove empty directory when no log segments remain -- Key: KAFKA-209 URL: https://issues.apache.org/jira/browse/KAFKA-209 Project: Kafka Issue Type: Improvement Components: core Reporter: Taylor Gautier When the log cleaner runs it deletes segments from the log directory. However, if all the segments are cleaned, the log directory itself is retained. There doesn't seem to be any need for this as the directory will be re-created on write if necessary. This JIRA would improve the log cleaner to also remove unused topic directories from the log directory. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-204) BoundedByteBufferReceive hides OutOfMemoryError
[ https://issues.apache.org/jira/browse/KAFKA-204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13151297#comment-13151297 ] Jun Rao commented on KAFKA-204: --- The main reason this was added is to show the requested size and the caller that triggered such a request. It would be nice if both pieces are logged together. With the new patch, those two pieces are logged separately (although should be close) and someone has to link them together manually. BoundedByteBufferReceive hides OutOfMemoryError --- Key: KAFKA-204 URL: https://issues.apache.org/jira/browse/KAFKA-204 Project: Kafka Issue Type: Bug Affects Versions: 0.7 Reporter: Chris Burroughs Assignee: Chris Burroughs Priority: Critical Attachments: k204-v1.txt private def byteBufferAllocate(size: Int): ByteBuffer = { var buffer: ByteBuffer = null try { buffer = ByteBuffer.allocate(size) } catch { case e: OutOfMemoryError = throw new RuntimeException(OOME with size + size, e) case e2 = throw e2 } buffer } This hides the fact that an Error occurred, and will likely result in some log handler printing a message, instead of exiting with non-zero status. Knowing how large the allocation was that caused an OOM is really nice, so I'd suggest logging in byteBufferAllocate and then re-throwing OutOfMemoryError -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-101) Avoid creating a new topic by the consumer
[ https://issues.apache.org/jira/browse/KAFKA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13150246#comment-13150246 ] Jun Rao commented on KAFKA-101: --- Actually another thing. If a topic doesn't already exist, the consumer shouldn't create the ZK path /broker/topics/topic. Currently, we do the following in ZookeeperConsumerConnector.consume ZkUtils.makeSurePersistentPathExists(zkClient, partitionPath) We should remove that line. There are a couple of things to check. (1) If the consumer can still get the watcher triggered when the topic is created (by a producer). I think this should happen since ZKClient registers an Exists watcher if path doesn't exist. But we should check. (2) There are probably places in the rebalance code where we should treat missing the ZK path /broker/topics/topic properly. Avoid creating a new topic by the consumer -- Key: KAFKA-101 URL: https://issues.apache.org/jira/browse/KAFKA-101 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: Jun Rao Labels: patch Attachments: 0002-KAFKA-101-Avoid-creating-a-new-topic-by-the-consumer.patch.0.7 Currently, if a consumer consumes a topic and the topic doesn't exist, the topic is created automatically. Sometimes this can be confusing. Often, an ad hoc user may put a wrong topic name in the consumer and thus create many unnecessary topics. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-50) kafka intra-cluster replication support
[ https://issues.apache.org/jira/browse/KAFKA-50?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149400#comment-13149400 ] Jun Rao commented on KAFKA-50: -- The dependencies of the sub-jiras look like the following: 48 49 47 - 46 - 43 44/4542 41 This means that initially, 47,48,49 can be worked on independently. kafka intra-cluster replication support --- Key: KAFKA-50 URL: https://issues.apache.org/jira/browse/KAFKA-50 Project: Kafka Issue Type: New Feature Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: kafka_replication_highlevel_design.pdf, kafka_replication_lowlevel_design.pdf Currently, Kafka doesn't have replication. Each log segment is stored in a single broker. This limits both the availability and the durability of Kafka. If a broker goes down, all log segments stored on that broker become unavailable to consumers. If a broker dies permanently (e.g., disk failure), all unconsumed data on that node is lost forever. Our goal is to replicate every log segment to multiple broker nodes to improve both the availability and the durability. We'd like to support the following in Kafka replication: 1. Configurable synchronous and asynchronous replication 2. Small unavailable window (e.g., less than 5 seconds) during broker failures 3. Auto recovery when a failed broker rejoins 4. Balanced load when a broker fails (i.e., the load on the failed broker is evenly spread among multiple surviving brokers) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-101) Avoid creating a new topic by the consumer
[ https://issues.apache.org/jira/browse/KAFKA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149414#comment-13149414 ] Jun Rao commented on KAFKA-101: --- Thanks for the patch. The test failures seem to be transient and are the results of time dependencies, which we should fix separately. Could you add a unit test on SimpleConsumer to consume a non-existing topic/partition? It should return an empty messageSet. Other than that, the patch looks good. Avoid creating a new topic by the consumer -- Key: KAFKA-101 URL: https://issues.apache.org/jira/browse/KAFKA-101 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: Jun Rao Labels: patch Attachments: 0002-KAFKA-101-Avoid-creating-a-new-topic-by-the-consumer.patch.0.7 Currently, if a consumer consumes a topic and the topic doesn't exist, the topic is created automatically. Sometimes this can be confusing. Often, an ad hoc user may put a wrong topic name in the consumer and thus create many unnecessary topics. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-201) Support for mirroring from multiple sources
[ https://issues.apache.org/jira/browse/KAFKA-201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149423#comment-13149423 ] Jun Rao commented on KAFKA-201: --- This should be relatively easy. can just extend KafkaServerStartble to take an array of ConsumerConfig and create multiple instances of EmbeddedConsumer. Support for mirroring from multiple sources --- Key: KAFKA-201 URL: https://issues.apache.org/jira/browse/KAFKA-201 Project: Kafka Issue Type: New Feature Reporter: Paul Querna Currently the EmbeddedConsumer is configured against a single source mirror. We have a use case of consuming from multiple sources clusters. Simple example, we have 3 datacenters which are collecting data: A, B, C. We want all 3 to get full copies of the data eventually, by using a on a whitelist of topics, and having the topics include the source data centers. So, we would like to be able to: Configure A to mirror B, and C with a whitelist of topics B,C Configure B to mirror A, and C with a whitelist of topics A,C Configure C to mirror A, and B with a whitelist of topics A,B -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-101) Avoid creating a new topic by the consumer
[ https://issues.apache.org/jira/browse/KAFKA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149426#comment-13149426 ] Jun Rao commented on KAFKA-101: --- Yes, since a test in SimpleConsumer will further test the wire protocol. Thanks, Avoid creating a new topic by the consumer -- Key: KAFKA-101 URL: https://issues.apache.org/jira/browse/KAFKA-101 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: Jun Rao Labels: patch Attachments: 0002-KAFKA-101-Avoid-creating-a-new-topic-by-the-consumer.patch.0.7 Currently, if a consumer consumes a topic and the topic doesn't exist, the topic is created automatically. Sometimes this can be confusing. Often, an ad hoc user may put a wrong topic name in the consumer and thus create many unnecessary topics. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-101) Avoid creating a new topic by the consumer
[ https://issues.apache.org/jira/browse/KAFKA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149442#comment-13149442 ] Jun Rao commented on KAFKA-101: --- We can add it in PrimitiveApiTest. Avoid creating a new topic by the consumer -- Key: KAFKA-101 URL: https://issues.apache.org/jira/browse/KAFKA-101 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: Jun Rao Labels: patch Attachments: 0002-KAFKA-101-Avoid-creating-a-new-topic-by-the-consumer.patch.0.7 Currently, if a consumer consumes a topic and the topic doesn't exist, the topic is created automatically. Sometimes this can be confusing. Often, an ad hoc user may put a wrong topic name in the consumer and thus create many unnecessary topics. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira