[jira] [Commented] (KAFKA-332) Mirroring should use multiple producers; add producer retries to DefaultEventHandler

2012-04-17 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13256026#comment-13256026
 ] 

Jun Rao commented on KAFKA-332:
---

Some comments:
1. DefaultEventHandler: 
1.1. It would be useful to see the retry # in trace log
1.2 We should capture all Throwable.

2. ProducerConfig: explain a bit more why num.retries is not appropriate for 
zk-based producer. Basically, during resend, we don't re-select brokers.

3. MirrorMaker: The usage of circularIterator is pretty fancy. Would it be 
simpler to just put all producers in an array and loop through it circularly? 

 Mirroring should use multiple producers; add producer retries to 
 DefaultEventHandler
 

 Key: KAFKA-332
 URL: https://issues.apache.org/jira/browse/KAFKA-332
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Joel Koshy
Assignee: Joel Koshy
Priority: Minor
 Attachments: KAFKA-332-v1.patch


 I'm clubbing these two together as these are both important for mirroring.
 (1) Multiple producers:
 Shallow iteration (KAFKA-315) helps improve mirroring throughput when
 messages are compressed. With shallow iteration, the mirror-maker's consumer
 does not do deep iteration over compressed messages. However, when its
 embedded producer sends these messages to the target cluster's brokers, the
 receiving broker does deep iteration to validate the messages before
 appending to the log.
 In the current (pre- KAFKA-48) request handling mechanism, one producer
 effectively translates to one server-side thread for handling produce
 requests, so there is still a bottleneck due to decompression (due to
 message validation) on the target broker.
 One way to work around this is to use broker.list with multiple brokers
 specified per broker. E.g.,
 broker.list=0:localhost:9191,1:localhost:9191,2:localhost:9191,... which
 effectively emulates multiple server-side threads. It would be better to
 just add a num.producers option to the mirror-maker and instantiate that
 many producers.
 (2) Retries:
 If the mirror-maker uses broker.list and one of the brokers is bounced for
 any reason, messages can get lost. Message loss can be reduced/avoided if
 the brokers are behind a VIP and if retries are supported. This option will
 not work for the zk-based producer because the decision of which broker to
 send to has already been made, so retries would go to the same (potentially
 still down) broker. (With KAFKA-253 it would work for zk-based producers as
 well, but that is only in 0.8).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-331) recurrent produce errors

2012-04-12 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13252476#comment-13252476
 ] 

Jun Rao commented on KAFKA-331:
---

The producer is thread safe. Are you using the sync or the async mode in the 
producer? Do you have a simple test that can reproduce this?

 recurrent produce errors
 

 Key: KAFKA-331
 URL: https://issues.apache.org/jira/browse/KAFKA-331
 Project: Kafka
  Issue Type: Bug
Reporter: Pierre-Yves Ritschard

 I am using trunk and regularily see such errors popping up:
 32477890 [kafka-processor-7] ERROR kafka.server.KafkaRequestHandlers  - Error 
 processing ProduceRequest on pref^@^@^@:0
 java.io.FileNotFoundException: 
 /mnt/kafka/logs/pref^@^@^@-0/.kafka (Is a directory)
 at java.io.RandomAccessFile.open(Native Method)
 at java.io.RandomAccessFile.init(RandomAccessFile.java:233)
 at kafka.utils.Utils$.openChannel(Utils.scala:324)
 at kafka.message.FileMessageSet.init(FileMessageSet.scala:75)
 at kafka.log.Log.loadSegments(Log.scala:144)
 at kafka.log.Log.init(Log.scala:116)
 at kafka.log.LogManager.createLog(LogManager.scala:149)
 at kafka.log.LogManager.getOrCreateLog(LogManager.scala:204)
 at 
 kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
 at 
 kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
 at 
 kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
 at 
 kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
 at kafka.network.Processor.handle(SocketServer.scala:296)
 at kafka.network.Processor.read(SocketServer.scala:319)
 at kafka.network.Processor.run(SocketServer.scala:214)
 at java.lang.Thread.run(Thread.java:679)
 32477890 [kafka-processor-7] ERROR kafka.network.Processor  - Closing socket 
 for /xx.xx.xx.xx because of error
 java.io.FileNotFoundException: 
 /mnt/kafka/logs/pref^@^@^@-0/.kafka (Is a directory)
 at java.io.RandomAccessFile.open(Native Method)
 at java.io.RandomAccessFile.init(RandomAccessFile.java:233)
 at kafka.utils.Utils$.openChannel(Utils.scala:324)
 at kafka.message.FileMessageSet.init(FileMessageSet.scala:75)
 at kafka.log.Log.loadSegments(Log.scala:144)
 at kafka.log.Log.init(Log.scala:116)
 at kafka.log.LogManager.createLog(LogManager.scala:149)
 at kafka.log.LogManager.getOrCreateLog(LogManager.scala:204)
 at 
 kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
 at 
 kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
 at 
 kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
 at 
 kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
 at kafka.network.Processor.handle(SocketServer.scala:296)
 at kafka.network.Processor.read(SocketServer.scala:319)
 at kafka.network.Processor.run(SocketServer.scala:214)
 at java.lang.Thread.run(Thread.java:679)
 This results in a pref directory created inside the log dir. The original 
 topic should be prefix, somehow a NUL gets inserted there.
 The producing was done with a kafka.javaapi.producer.Producer instance, on 
 which send was called with a kafka.javaapi.producer.ProducerData instance.
 There are no log entries created inside that dir and no impact on the overall 
 operation of the broker operations and consumers.
 Is the producer thread-safe ?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-301) Implement the broker startup procedure

2012-04-11 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13251711#comment-13251711
 ] 

Jun Rao commented on KAFKA-301:
---

Can't seem to apply patch v1 to 0.8 branch.

patching file core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
patching file core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
patching file core/src/test/scala/unit/kafka/server/StateChangeTest.scala
patching file core/src/main/scala/kafka/producer/Producer.scala
patching file core/src/main/scala/kafka/producer/async/QueueFullException.scala
patching file core/src/main/scala/kafka/admin/AdminUtils.scala
patching file 
core/src/main/scala/kafka/common/NoEpochForPartitionException.scala
can't find file to patch at input line 374
Perhaps you used the wrong -p or --strip option?
The text leading up to this was:
--
|Index: core/src/main/scala/kafka/common/QueueFullException.scala
|===
|--- core/src/main/scala/kafka/common/QueueFullException.scala  (revision 
1304473)
|+++ core/src/main/scala/kafka/common/QueueFullException.scala  (working copy)
--
File to patch: 


 Implement the broker startup procedure
 --

 Key: KAFKA-301
 URL: https://issues.apache.org/jira/browse/KAFKA-301
 Project: Kafka
  Issue Type: Sub-task
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Attachments: kafka-301-draft.patch, kafka-301-v1.patch


 This JIRA will involve implementing the list of actions to be taken on broker 
 startup, as listed by the brokerStartup() and startReplica() algorithm in the 
 Kafka replication design doc. Since the stateChangeListener is part of 
 KAFKA-44, this JIRA can leave it as a stub.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional long poll support in fetch request

2012-04-10 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13250856#comment-13250856
 ] 

Jun Rao commented on KAFKA-48:
--

Patch v4 looks good. Just one more comment.

41. RequestPurgatory.update(): if(w == null), could we return a singleton empty 
array, instead of creating a new one every time?

 Implement optional long poll support in fetch request
 ---

 Key: KAFKA-48
 URL: https://issues.apache.org/jira/browse/KAFKA-48
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao
Assignee: Jay Kreps
 Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch, 
 KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff


 Currently, the fetch request is non-blocking. If there is nothing on the 
 broker for the consumer to retrieve, the broker simply returns an empty set 
 to the consumer. This can be inefficient, if you want to ensure low-latency 
 because you keep polling over and over. We should make a blocking version of 
 the fetch request so that the fetch request is not returned until the broker 
 has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-249) Separate out Kafka mirroring into a stand-alone app

2012-04-06 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13248898#comment-13248898
 ] 

Jun Rao commented on KAFKA-249:
---

Actually, just committed kafka-315. Could you rebase?

 Separate out Kafka mirroring into a stand-alone app
 ---

 Key: KAFKA-249
 URL: https://issues.apache.org/jira/browse/KAFKA-249
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Joel Koshy
Assignee: Joel Koshy
 Fix For: 0.7.1

 Attachments: KAFKA-249.v1.patch, KAFKA-249.v2.patch, 
 KAFKA-249.v3-v4.incremental.patch, KAFKA-249.v3.patch, KAFKA-249.v4.patch


 I would like to discuss on this jira, the feasibility/benefits of separating
 out Kafka's mirroring feature from the broker into a stand-alone app, as it
 currently has a couple of limitations and issues.
 For example, we recently had to deal with Kafka mirrors that were in fact
 idle due to the fact that mirror threads were not created at start-up due to
 a rebalancing exception, but the Kafka broker itself did not shutdown. This
 has since been fixed, but is indicative of (avoidable) problems in embedding
 non-broker specific features in the broker.
 Logically, it seems to make sense to separate it out to achieve better
 division of labor.  Furthermore, enhancements to mirroring may be less
 clunky to implement and use with a stand-alone app.  For example to support
 custom partitioning on the target cluster, or to mirror from multiple
 clusters we would probably need to be able to pass in multiple embedded
 consumer/embedded producer configs, which would be less ugly if the
 mirroring process were a stand-alone app.  Also, if we break it out, it
 would be convenient to use as a consumption engine for the console
 consumer which will make it easier to add on features such as wildcards in
 topic consumption, since it contains a ZooKeeper topic discovery component.
 Any suggestions and/or objections to this?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

2012-04-06 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13249129#comment-13249129
 ] 

Jun Rao commented on KAFKA-320:
---

Ok, we can take v3 for now and track the broker startup/shutdown in kafka-328.

 testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
 --

 Key: KAFKA-320
 URL: https://issues.apache.org/jira/browse/KAFKA-320
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7, 0.8
Reporter: Neha Narkhede
Assignee: Neha Narkhede
Priority: Critical
 Attachments: kafka-320-v2.patch, kafka-320-v3-delta.patch, 
 kafka-320-v3.patch, kafka-320.patch


 The testZKSendWithDeadBroker inside ProducerTest fails intermittently with 
 the following exception -
 [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
 java.lang.RuntimeException: A broker is already registered on the path 
 /brokers/ids/0. This probably indicates that you either have configured a 
 brokerid that is already in use, or else you have shutdown this broker and 
 restarted it faster than the zookeeper timeout so it appears to be 
 re-registering.
 at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
 at 
 kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
 at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
 at 
 kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at junit.framework.TestCase.runTest(TestCase.java:164)
 at junit.framework.TestCase.runBare(TestCase.java:130)
 at junit.framework.TestResult$1.protect(TestResult.java:110)
 at junit.framework.TestResult.runProtected(TestResult.java:128)
 at junit.framework.TestResult.run(TestResult.java:113)
 at junit.framework.TestCase.run(TestCase.java:120)
 at junit.framework.TestSuite.runTest(TestSuite.java:228)
 at junit.framework.TestSuite.run(TestSuite.java:223)
 at junit.framework.TestSuite.runTest(TestSuite.java:228)
 at junit.framework.TestSuite.run(TestSuite.java:223)
 at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
 at 
 org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
 at sbt.TestRunner.run(TestFramework.scala:53)
 at sbt.TestRunner.runTest$1(TestFramework.scala:67)
 at sbt.TestRunner.run(TestFramework.scala:76)
 at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
 at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
 at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
 at sbt.NamedTestTask.run(TestFramework.scala:92)
 at 
 sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
 at 
 sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
 at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
 at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
 at sbt.impl.RunTask.runTask(RunTask.scala:85)
 at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
 at 
 sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
 at 
 sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
 at 
 sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
 at 
 sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
 at sbt.Control$.trapUnit(Control.scala:19)
 at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
 The test basically restarts a server and fails with this exception during the 
 restart
 This is unexpected, since server1, after shutting down, should trigger the 
 deletion of its registration of the broker id from ZK. But, here is the Kafka 
 bug causing this problem -
 In the test during server1.shutdown(), we do close the zkClient associated 
 with the broker and it successfully deletes the broker's registration info 
 from Zookeeper. After this, server1 can be succesfully started. Then the test 
 completes and in the teardown(), we call server1.shutdown(). During 

[jira] [Commented] (KAFKA-48) Implement optional long poll support in fetch request

2012-04-05 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13247409#comment-13247409
 ] 

Jun Rao commented on KAFKA-48:
--

Thanks for patch v3. Some comments:

31. DelayedFetch is keyed off topic. It should be keyed off (topic, partition) 
since a consumer may be interested in only a subset of partitions within a 
topic.

32. KafkaApis: The following 3 lines are duplicated in 2 places.
  val topicData = readMessageSets(delayed.fetch.offsetInfo)
  val response = new FetchResponse(FetchRequest.CurrentVersion, 
delayed.fetch.correlationId, topicData)
  requestChannel.sendResponse(new RequestChannel.Response(delayed.request, 
new FetchResponseSend(response, ErrorMapping.NoError), -1))
Should we put them in a private method and share the code?

33. ExpiredRequestReaper.purgeExpired(): We need to decrement unsatisfied count 
here.

34. FetchRequest: Can we have the default constants for correlationId, 
clientid, etc defined and shared btw the constructor and the request builder?

35. MessageSetSend.empty is unused. Should we remove it?

 Implement optional long poll support in fetch request
 ---

 Key: KAFKA-48
 URL: https://issues.apache.org/jira/browse/KAFKA-48
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao
Assignee: Jay Kreps
 Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48.patch


 Currently, the fetch request is non-blocking. If there is nothing on the 
 broker for the consumer to retrieve, the broker simply returns an empty set 
 to the consumer. This can be inefficient, if you want to ensure low-latency 
 because you keep polling over and over. We should make a blocking version of 
 the fetch request so that the fetch request is not returned until the broker 
 has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.

2012-04-05 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13247458#comment-13247458
 ] 

Jun Rao commented on KAFKA-49:
--

Prashanth,

Thanks for the patch. It seems that kafka-48 is almost ready. Since that's a 
relatively large patch, I will commit your patch after kafka-48 is committed.

 Add acknowledgement to the produce request.
 ---

 Key: KAFKA-49
 URL: https://issues.apache.org/jira/browse/KAFKA-49
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Prashanth Menon
 Fix For: 0.8

 Attachments: KAFKA-49-continued-v2.patch, KAFKA-49-continued.patch, 
 KAFKA-49-v1.patch, KAFKA-49-v2.patch, KAFKA-49-v3.patch


 Currently, the produce request doesn't get acknowledged. We need to have a 
 broker send a response to the producer and have the producer wait for the 
 response before sending the next request.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-249) Separate out Kafka mirroring into a stand-alone app

2012-04-04 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13246497#comment-13246497
 ] 

Jun Rao commented on KAFKA-249:
---

v2 looks much better now. Some additional comments:

21. Should isTopicAllowed be part of TopicFilterSpec, especially if we want to 
extend it in the future? If so, we don't need TopicFilter.
22. In this patch, I suggest that we only put message and topic in 
MessageAndMetadata. We can have a separate jira on how to expose offsets to the 
consumer. There, we need to discuss how a consume can rewind the consumption 
using the offset returned.
23. It's probably better to rename KafkaMessageAndMetadataStream to KafkaStream.
24. ZookeeperConsumerConnector:
24.1 reinitializeConsumer:  I think it will make the code easier to understand 
if we explicitly define the type of val consumerThreadIdsPerTopic, 
topicThreadIds and threadQueueStreamPairs. It would be also very useful to 
explicitly define the return type of consumerThreadIdsPerTopic.flatten.
24.2 reinitializeConsumer: This method is called every time a new topic is 
discovered. It feels strange that we have to register the consumer here. 
Ideally, each consumer is registered exactly once. Also, it seems that each 
time this method is called, we only add new entries to 
loadBalancerListener.kafkaMessageAndMetadataStreams. Shouldn't we clear this 
map first so that deleted topics can be removed?

25. ByteBufferMessageSet: It's not clear to me if the iterator of 
ByteBufferMessageSet should return MessageAndMetadata. This is because 
ByteBufferMessageSet itself doesn't know all the metadata, such as topic and 
partition. So, it seems the iterator of this class should probably remain 
MessageAndOffset. MessageAndMetadata is only used for the client api.

26. MirrorMaker: The shutdown hook should close producer.


 Separate out Kafka mirroring into a stand-alone app
 ---

 Key: KAFKA-249
 URL: https://issues.apache.org/jira/browse/KAFKA-249
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Joel Koshy
Assignee: Joel Koshy
 Fix For: 0.7.1

 Attachments: KAFKA-249.v1.patch, KAFKA-249.v2.patch, 
 KAFKA-249.v3.patch


 I would like to discuss on this jira, the feasibility/benefits of separating
 out Kafka's mirroring feature from the broker into a stand-alone app, as it
 currently has a couple of limitations and issues.
 For example, we recently had to deal with Kafka mirrors that were in fact
 idle due to the fact that mirror threads were not created at start-up due to
 a rebalancing exception, but the Kafka broker itself did not shutdown. This
 has since been fixed, but is indicative of (avoidable) problems in embedding
 non-broker specific features in the broker.
 Logically, it seems to make sense to separate it out to achieve better
 division of labor.  Furthermore, enhancements to mirroring may be less
 clunky to implement and use with a stand-alone app.  For example to support
 custom partitioning on the target cluster, or to mirror from multiple
 clusters we would probably need to be able to pass in multiple embedded
 consumer/embedded producer configs, which would be less ugly if the
 mirroring process were a stand-alone app.  Also, if we break it out, it
 would be convenient to use as a consumption engine for the console
 consumer which will make it easier to add on features such as wildcards in
 topic consumption, since it contains a ZooKeeper topic discovery component.
 Any suggestions and/or objections to this?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

2012-04-02 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13244395#comment-13244395
 ] 

Jun Rao commented on KAFKA-320:
---

The ZookeeperTestHarness change looks nice. a couple more comments:

4. KafkaServer: It does look  a bit more complex now and some of the testing is 
not done atomically. How about the following? 
4.1 add an AtomticBoolean isServerStartable and initialize to true;
4.2 in startup(), if we can atomically set isServerStartable from true to 
false, proceed with startup; otherwise throw an exception.
4.3 in shutdown(), if isServerStartable is false, proceed with shutdown, at the 
very end, set isServerStartable to true. 
Startup() and shutdown() are expected to be called from the same thread. So we 
can expect a shutdown won't be called until a startup completes.

5. SyncProducerTest: unused imports


 testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
 --

 Key: KAFKA-320
 URL: https://issues.apache.org/jira/browse/KAFKA-320
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7, 0.8
Reporter: Neha Narkhede
Assignee: Neha Narkhede
Priority: Critical
 Attachments: kafka-320-v2.patch, kafka-320.patch


 The testZKSendWithDeadBroker inside ProducerTest fails intermittently with 
 the following exception -
 [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
 java.lang.RuntimeException: A broker is already registered on the path 
 /brokers/ids/0. This probably indicates that you either have configured a 
 brokerid that is already in use, or else you have shutdown this broker and 
 restarted it faster than the zookeeper timeout so it appears to be 
 re-registering.
 at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
 at 
 kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
 at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
 at 
 kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at junit.framework.TestCase.runTest(TestCase.java:164)
 at junit.framework.TestCase.runBare(TestCase.java:130)
 at junit.framework.TestResult$1.protect(TestResult.java:110)
 at junit.framework.TestResult.runProtected(TestResult.java:128)
 at junit.framework.TestResult.run(TestResult.java:113)
 at junit.framework.TestCase.run(TestCase.java:120)
 at junit.framework.TestSuite.runTest(TestSuite.java:228)
 at junit.framework.TestSuite.run(TestSuite.java:223)
 at junit.framework.TestSuite.runTest(TestSuite.java:228)
 at junit.framework.TestSuite.run(TestSuite.java:223)
 at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
 at 
 org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
 at sbt.TestRunner.run(TestFramework.scala:53)
 at sbt.TestRunner.runTest$1(TestFramework.scala:67)
 at sbt.TestRunner.run(TestFramework.scala:76)
 at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
 at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
 at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
 at sbt.NamedTestTask.run(TestFramework.scala:92)
 at 
 sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
 at 
 sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
 at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
 at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
 at sbt.impl.RunTask.runTask(RunTask.scala:85)
 at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
 at 
 sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
 at 
 sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
 at 
 sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
 at 
 sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
 at sbt.Control$.trapUnit(Control.scala:19)
 at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)

[jira] [Commented] (KAFKA-249) Separate out Kafka mirroring into a stand-alone app

2012-03-27 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13240166#comment-13240166
 ] 

Jun Rao commented on KAFKA-249:
---

Thanks for the patch. Some comments:

1. For future extension, I am thinking that we should probably unifying 
KafkaMessageStream and KafkaMessageAndTopicStream to sth like 
KafkaMessageMetadataStream. The stream gives a iterator of Message and its 
associated meta data. For now, the meta data can be just topic. In the future, 
it may include things like partition id and offset.

2. ZookeeperConsumerConnector:
2.1 updateFetcher: no need to pass in messagStreams
2.2 ZKRebalancerListener: It seems that kafkaMessageStream can be immutable.
2.3 createMessageStreamByFilter: topicsStreamsMap is empty when passed to 
ZKRebalanceListener. This means that the queue is not cleared during rebalance.
2.4 consumeWildCardTopics: I find it hard to read the code in this method. Is 
there a real benefit to use implicit conversion here, instead of explicit 
conversion? It's not clear to me where the conversion is used. The 2-level 
tuple makes it hard to figure out what the referred fields represent. Is the 
code relying on groupedTopicThreadIds being sorted by (topic, threadid)? If so, 
where is that enforced.

3. KafkaServerStartable: Should we remove the embedded consumer now?

4. Utils, UtilsTest: unused import


 Separate out Kafka mirroring into a stand-alone app
 ---

 Key: KAFKA-249
 URL: https://issues.apache.org/jira/browse/KAFKA-249
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Joel Koshy
Assignee: Joel Koshy
 Fix For: 0.7.1

 Attachments: KAFKA-249.v1.patch, KAFKA-249.v2.patch


 I would like to discuss on this jira, the feasibility/benefits of separating
 out Kafka's mirroring feature from the broker into a stand-alone app, as it
 currently has a couple of limitations and issues.
 For example, we recently had to deal with Kafka mirrors that were in fact
 idle due to the fact that mirror threads were not created at start-up due to
 a rebalancing exception, but the Kafka broker itself did not shutdown. This
 has since been fixed, but is indicative of (avoidable) problems in embedding
 non-broker specific features in the broker.
 Logically, it seems to make sense to separate it out to achieve better
 division of labor.  Furthermore, enhancements to mirroring may be less
 clunky to implement and use with a stand-alone app.  For example to support
 custom partitioning on the target cluster, or to mirror from multiple
 clusters we would probably need to be able to pass in multiple embedded
 consumer/embedded producer configs, which would be less ugly if the
 mirroring process were a stand-alone app.  Also, if we break it out, it
 would be convenient to use as a consumption engine for the console
 consumer which will make it easier to add on features such as wildcards in
 topic consumption, since it contains a ZooKeeper topic discovery component.
 Any suggestions and/or objections to this?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-307) Refactor server code to remove interdependencies between LogManager and KafkaZooKeeper

2012-03-23 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13236734#comment-13236734
 ] 

Jun Rao commented on KAFKA-307:
---

+1 on patch v3.

 Refactor server code to remove interdependencies between LogManager and 
 KafkaZooKeeper
 --

 Key: KAFKA-307
 URL: https://issues.apache.org/jira/browse/KAFKA-307
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.7, 0.8
Reporter: Neha Narkhede
 Attachments: kafka-307-draft.patch, kafka-307-v2.patch, 
 kafka-307-v3.patch


 Currently, LogManager wraps KafkaZooKeeper which is meant for all zookeeper 
 interaction of a Kafka server. With replication, KafkaZookeeper will handle 
 leader election, various state change listeners and then start replicas. Due 
 to interdependency between LogManager and KafkaZookeeper, starting replicas 
 is not possible until LogManager starts up completely. Due to this, we have 
 to separate the broker startup procedures required for replication to get 
 around this problem.
 It will be good to refactor and clean up the server code, before diving 
 deeper into replication.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

2012-03-23 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13236782#comment-13236782
 ] 

Jun Rao commented on KAFKA-305:
---

Prashanth,

v2 patch looks good. 

As for 5, I do see transient failures of testZKSendWithDeadBroker. This is a 
bit weird. During broker shutdown, we close the ZK client, which should cause 
all ephemeral nodes to be deleted in ZK. Could you verify if this is indeed the 
behavior of ZK?

As for BrokerPartitionInfo and ProducerPool, we should clean up dead brokers. 
Could you open a separate jira to track that?

 SyncProducer does not correctly timeout
 ---

 Key: KAFKA-305
 URL: https://issues.apache.org/jira/browse/KAFKA-305
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7, 0.8
Reporter: Prashanth Menon
Priority: Critical
 Attachments: KAFKA-305-v1.patch, KAFKA-305-v2.patch


 So it turns out that using the channel in SyncProducer like we are to perform 
 blocking reads will not trigger socket timeouts (though we set it) and will 
 block forever which is bad.  This bug identifies the issue: 
 http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article 
 presents a potential work-around: 
 http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for 
 workaround. The work-around is a simple solution that involves creating a 
 separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

2012-03-23 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13237021#comment-13237021
 ] 

Jun Rao commented on KAFKA-305:
---

If this is indeed a ZK issue, we can probably check/wait that the ephemeral 
node is gone before restarting the broker.

 SyncProducer does not correctly timeout
 ---

 Key: KAFKA-305
 URL: https://issues.apache.org/jira/browse/KAFKA-305
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7, 0.8
Reporter: Prashanth Menon
Priority: Critical
 Attachments: KAFKA-305-v1.patch, KAFKA-305-v2.patch


 So it turns out that using the channel in SyncProducer like we are to perform 
 blocking reads will not trigger socket timeouts (though we set it) and will 
 block forever which is bad.  This bug identifies the issue: 
 http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article 
 presents a potential work-around: 
 http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for 
 workaround. The work-around is a simple solution that involves creating a 
 separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

2012-03-22 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13235614#comment-13235614
 ] 

Jun Rao commented on KAFKA-305:
---

Prashant,

2. If you want to make sure that a broker is shut down, you need to call 
kafkaServer.awaitShutdown after calling kafkaServer.shutdown. Overall, I don't 
quite understand how the new test works. It only brought down 1 broker and yet 
the comment says all brokers are down. If it is indeed that all brokers are 
down, any RPC call to the broker should get a broken pipe or socket closed 
exception immediately, not a sockettimeout exception. So, to really test that 
the timeout works, we need to keep the broker alive and somehow delay the 
response from the server. This can probably be done with a mock request handler.

 SyncProducer does not correctly timeout
 ---

 Key: KAFKA-305
 URL: https://issues.apache.org/jira/browse/KAFKA-305
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7, 0.8
Reporter: Prashanth Menon
Priority: Critical
 Attachments: KAFKA-305-v1.patch


 So it turns out that using the channel in SyncProducer like we are to perform 
 blocking reads will not trigger socket timeouts (though we set it) and will 
 block forever which is bad.  This bug identifies the issue: 
 http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article 
 presents a potential work-around: 
 http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for 
 workaround. The work-around is a simple solution that involves creating a 
 separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-310) Incomplete message set validation checks in kafka.log.Log's append API can corrupt on disk log

2012-03-22 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13235636#comment-13235636
 ] 

Jun Rao commented on KAFKA-310:
---

ByteBufferMessageSet.validBytes currently makes a deep iteration of all 
messages, which means that we need to decompress messages. To avoid this 
overhead, we should change ByteBufferMessageSet.validBytes to use a shallow 
iterator.

 Incomplete message set validation checks in kafka.log.Log's append API can 
 corrupt on disk log
 --

 Key: KAFKA-310
 URL: https://issues.apache.org/jira/browse/KAFKA-310
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.7
Reporter: Neha Narkhede
Priority: Critical
 Attachments: kafka-310.patch


 The behavior of the ByteBufferMessageSet's iterator is to ignore and return 
 false if some trailing bytes are found that cannot be de serialized into a 
 Kafka message. The append API in Log, iterates through a ByteBufferMessageSet 
 and validates the checksum of each message. Though, while appending data to 
 the log, it just uses the underlying ByteBuffer that forms the 
 ByteBufferMessageSet. Now, due to some bug, if the ByteBuffer has some 
 trailing data, that will get appended to the on-disk log too. This can cause 
 corruption of the log.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-310) Incomplete message set validation checks in kafka.log.Log's append API can corrupt on disk log

2012-03-22 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13235675#comment-13235675
 ] 

Jun Rao commented on KAFKA-310:
---

+1 on v2.

 Incomplete message set validation checks in kafka.log.Log's append API can 
 corrupt on disk log
 --

 Key: KAFKA-310
 URL: https://issues.apache.org/jira/browse/KAFKA-310
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.7
Reporter: Neha Narkhede
Priority: Critical
 Attachments: kafka-310-v2.patch, kafka-310.patch


 The behavior of the ByteBufferMessageSet's iterator is to ignore and return 
 false if some trailing bytes are found that cannot be de serialized into a 
 Kafka message. The append API in Log, iterates through a ByteBufferMessageSet 
 and validates the checksum of each message. Though, while appending data to 
 the log, it just uses the underlying ByteBuffer that forms the 
 ByteBufferMessageSet. Now, due to some bug, if the ByteBuffer has some 
 trailing data, that will get appended to the on-disk log too. This can cause 
 corruption of the log.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-309) Bug in FileMessageSet's append API can corrupt on disk log

2012-03-22 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13235679#comment-13235679
 ] 

Jun Rao commented on KAFKA-309:
---

+1 on the patch.

 Bug in FileMessageSet's append API can corrupt on disk log
 --

 Key: KAFKA-309
 URL: https://issues.apache.org/jira/browse/KAFKA-309
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7
Reporter: Neha Narkhede
Assignee: Neha Narkhede
Priority: Critical
 Attachments: kafka-309-test.patch, kafka-309.patch


 In FileMessageSet's append API, we write a ByteBufferMessageSet to a log in 
 the following manner -
 while(written  messages.sizeInBytes)
   written += messages.writeTo(channel, 0, messages.sizeInBytes)
 In ByteBufferMessageSet, the writeTo API uses buffer.duplicate() to append to 
 a channel -
   def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long =
 channel.write(buffer.duplicate)
 If the channel doesn't write the ByteBuffer in one call, then we call it 
 again until sizeInBytes bytes are written. But the next call will use 
 buffer.duplicate() to write to the FileChannel, which will write the entire 
 ByteBufferMessageSet again to the file. 
 Effectively, we have a corrupted set of messages on disk. 
 Thinking about it, FileChannel is a blocking channel, so ideally, the entire 
 ByteBuffer should be written to the FileChannel in one call. I wrote a test 
 (attached here) and saw that it does. But I'm not aware if there are some 
 corner cases when it doesn't do so. In those cases, Kafka will end up 
 corrupting on disk log segment.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-307) Refactor server code to remove interdependencies between LogManager and KafkaZooKeeper

2012-03-22 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13236175#comment-13236175
 ] 

Jun Rao commented on KAFKA-307:
---

Thanks for patch v2. Looks cleaner. A couple of other comments:

3. Is it better for KafkaZooKeeper to call ReplicaManager.addLocalReplica 
directly, instead of going through KafkaServer? For all replicas added in 
KafkaZookeeper, we know they are all local and should have a log. We could call 
LogManager.getOrCreateLog to get the log and pass it to ReplicaManager.

4. There are unused imports in KafkaServer.

 Refactor server code to remove interdependencies between LogManager and 
 KafkaZooKeeper
 --

 Key: KAFKA-307
 URL: https://issues.apache.org/jira/browse/KAFKA-307
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.7, 0.8
Reporter: Neha Narkhede
 Attachments: kafka-307-draft.patch, kafka-307-v2.patch


 Currently, LogManager wraps KafkaZooKeeper which is meant for all zookeeper 
 interaction of a Kafka server. With replication, KafkaZookeeper will handle 
 leader election, various state change listeners and then start replicas. Due 
 to interdependency between LogManager and KafkaZookeeper, starting replicas 
 is not possible until LogManager starts up completely. Due to this, we have 
 to separate the broker startup procedures required for replication to get 
 around this problem.
 It will be good to refactor and clean up the server code, before diving 
 deeper into replication.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-300) Implement leader election

2012-03-20 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13233885#comment-13233885
 ] 

Jun Rao commented on KAFKA-300:
---

Thanks for the patch. Some comments:
1. KafkaZookeeper:
1.1 LeaderChangeListener.handleDataDeleted: no need to first check that leader 
exists. Since leader could change immediately after the check is done and 
subsequent logic still needs to handle this case.
1.2 TopicChangeListener: curChilds returned from the call back gives all 
children, not just new children.
1.3 SessionExpireListener: We could missed some new topics created when a 
session expires. Just calling subscribeToTopicAndPartitionsChanges may not be 
enough since we may need to call handleNewTopics for those missed new topics. 
1.4 leaderElection probably needs to return a boolean to indicate if the 
election succeeded or not.

2. LogManager:
2.1 It seems to me that replicas is not directly tied to LogManager and 
probably should be kept somewhere else, like KafkaServer. 
2.2 It aslo seems that we need to keep a map of (topic, partition_id, 
broker_id) = Replica so that we can address each replica more efficiently. 
This can be either a 1 level or a 2 level map.

3. DefaultEventHandler.handle: We probably should break out of the loop when 
outstandingProduceRequests is empty.

4. ZkUtils.waitUntilLeaderIsElected should be moved to test (sth like 
ZKTestUtil).

5. 
KafkaServer,BrokerPartitionInfo,DefaulEventHandler,LazyProducerTest,LeaderElectionTest:
 removed unused imports





 Implement leader election
 -

 Key: KAFKA-300
 URL: https://issues.apache.org/jira/browse/KAFKA-300
 Project: Kafka
  Issue Type: Sub-task
Reporter: Neha Narkhede
 Attachments: kafka-300.patch


 According to the Kafka replication design 
 (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Replication), this 
 JIRA will involve implementing the leaderElection() procedure. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

2012-03-20 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13233969#comment-13233969
 ] 

Jun Rao commented on KAFKA-305:
---

Prashanth,

Thanks for the patch. This is very useful. Some comments:

1. I think it makes sense for SimpleConsumer to use BlockingChannel as well. 
Could you change that in this patch too?
2. ProducerTest.testZKSendWithDeadBroker: This test doesn't really test the 
timeout on getting a response. We probably need to create a mock kafkaserver 
(that don't send a response) to test this out.
3. BlockingChannel: 
3.1 We probably should rename timeoutMs to readTimeoutMs since only reads are 
subject to the timeout.
3.2 We should pass in a socketSendBufferSize and a socketReceiveBufferSize.
3.3 Should host and port be part of the constructor? It seems to me it's 
cleaner if each instance of BlockingChannel is tied to 1 host and 1 port.

I'd also be interested in your findings on the comparison with NIO with 
selectors.


 SyncProducer does not correctly timeout
 ---

 Key: KAFKA-305
 URL: https://issues.apache.org/jira/browse/KAFKA-305
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7, 0.8
Reporter: Prashanth Menon
Priority: Critical
 Attachments: KAFKA-305-v1.patch


 So it turns out that using the channel in SyncProducer like we are to perform 
 blocking reads will not trigger socket timeouts (though we set it) and will 
 block forever which is bad.  This bug identifies the issue: 
 http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article 
 presents a potential work-around: 
 http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for 
 workaround. The work-around is a simple solution that involves creating a 
 separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-45) Broker startup, leader election, becoming a leader/follower for intra-cluster replication

2012-03-13 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13228774#comment-13228774
 ] 

Jun Rao commented on KAFKA-45:
--

If there are 2 followers and leader receives ack from only follow 1, but not 
follower 2 (within timeout), the leader will kick follower 2 from ISR before it 
can commit the message and ack the producer. So, follower 2 will never get a 
chance to become the new leader should the current leader fail.

 Broker startup, leader election, becoming a leader/follower for intra-cluster 
 replication
 -

 Key: KAFKA-45
 URL: https://issues.apache.org/jira/browse/KAFKA-45
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao
Assignee: Neha Narkhede

 We need to implement the logic for starting a broker with replicated 
 partitions, the leader election logic and how to become a leader and a 
 follower.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.

2012-03-13 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13228845#comment-13228845
 ] 

Jun Rao commented on KAFKA-49:
--

Prashanth,

I am ready to commit your patch. A couple things:

1. Your patch added a new class ResponseHandler, but it's not used. Should that 
be removed?
2. Your concern #1 is valid. Could you create another jira to track this?
3. For your concern #2, it's ok for getMetadataApi to return empty leader in a 
transition state. The client will simply backoff a bit and call getMetadataApi 
again.

 Add acknowledgement to the produce request.
 ---

 Key: KAFKA-49
 URL: https://issues.apache.org/jira/browse/KAFKA-49
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Prashanth Menon
 Fix For: 0.8

 Attachments: KAFKA-49-v1.patch, KAFKA-49-v2.patch


 Currently, the produce request doesn't get acknowledged. We need to have a 
 broker send a response to the producer and have the producer wait for the 
 response before sending the next request.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-296) Update Go Client to new version of Go

2012-03-13 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13228865#comment-13228865
 ] 

Jun Rao commented on KAFKA-296:
---

I can't seem to apply the patch. Could you rebase?
patching file clients/go/tools/publisher/publisher.go
Hunk #1 FAILED at 22.
Hunk #2 succeeded at 87 with fuzz 2 (offset 38 lines).
1 out of 2 hunks FAILED -- saving rejects to file 
clients/go/tools/publisher/publisher.go.rej
patching file clients/go/tools/publisher/publisher.go
Hunk #1 FAILED at 23.
Hunk #2 FAILED at 41.
2 out of 2 hunks FAILED -- saving rejects to file 
clients/go/tools/publisher/publisher.go.rej


 Update Go Client to new version of Go
 -

 Key: KAFKA-296
 URL: https://issues.apache.org/jira/browse/KAFKA-296
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.7.1
Reporter: AaronR
 Attachments: go1updates.patch, kafka296-v2.patch


 go  (http://golang.org) is close to releasing a new release of go (1.0) which 
 requires updates to the client, in the meantime most of the go community has 
 moved to this version.  
 This change contains:
 * language changes to existing client (os.Error, time. Signals, etc)
 * removes the Makefile's (no longer used by go)
 * It also runs go fmt  (formats it in standard go) which are most of the 
 lines changes, from spaces to tabs.
 * updates the import path to allow for go get installs (don't need to get 
 source and build)   
 Not sure which versions this should apply to, but i think it should go to 0.7 
 and newer.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.

2012-03-08 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13225321#comment-13225321
 ] 

Jun Rao commented on KAFKA-49:
--

Prashanth,

Thanks for the patch. Overall, it looks pretty good. Some comments:
1. KafkaApis: Even when the produce request requires no ack, we will still need 
to send error code back. So we should always send a ProduceResponse. When no 
ack is specified, we probably don't need to send offsets back.
2. ProduceRequest.getNumMessages: rename it to something like 
getNumTopicPartitions
3. AsyncProducerTeest.testDefaultHanlderRetryLogic: doesn't really test retry
4. SyncProducerTest.testProduceBlocksWhenRequired: Use createTopic instead of 
creating ZK path directly.
5. I agree that it's probably better to use Seq in our requests/response, 
instead of Array. Then we need a java version to convert Seq to java array and 
vice versa. Please open a separate jira to track this.


 Add acknowledgement to the produce request.
 ---

 Key: KAFKA-49
 URL: https://issues.apache.org/jira/browse/KAFKA-49
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Prashanth Menon
 Fix For: 0.8

 Attachments: KAFKA-49-v1.patch


 Currently, the produce request doesn't get acknowledged. We need to have a 
 broker send a response to the producer and have the producer wait for the 
 response before sending the next request.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.

2012-03-08 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13225845#comment-13225845
 ] 

Jun Rao commented on KAFKA-49:
--

1. Or we can just treat noacks the same as act=1 for now.

 Add acknowledgement to the produce request.
 ---

 Key: KAFKA-49
 URL: https://issues.apache.org/jira/browse/KAFKA-49
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Prashanth Menon
 Fix For: 0.8

 Attachments: KAFKA-49-v1.patch


 Currently, the produce request doesn't get acknowledged. We need to have a 
 broker send a response to the producer and have the producer wait for the 
 response before sending the next request.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-273) Occassional GZIP errors on the server while writing compressed data to disk

2012-03-07 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13224538#comment-13224538
 ] 

Jun Rao commented on KAFKA-273:
---

The patch for EOF looks fine. We probably need to do some system test to make 
sure this doesn't introduce new problems, especially when the compressed size 
is relatively large. Once that test is done. We can commit the patch.

 Occassional GZIP errors on the server while writing compressed data to disk
 ---

 Key: KAFKA-273
 URL: https://issues.apache.org/jira/browse/KAFKA-273
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Attachments: kafka-273.patch


 Occasionally, we see the following errors on the Kafka server -
 2012/02/08 14:58:21.832 ERROR [KafkaRequestHandlers] [kafka-processor-6] 
 [kafka] Error processing MultiProducerRequest on NusImpressionSetEvent:0
 java.io.EOFException: Unexpected end of ZLIB input stream
 at 
 java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:223)
 at 
 java.util.zip.InflaterInputStream.read(InflaterInputStream.java:141)
 at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:92)
 at java.io.FilterInputStream.read(FilterInputStream.java:90)
 at kafka.message.GZIPCompression.read(CompressionUtils.scala:52)
 at 
 kafka.message.CompressionUtils$$anonfun$decompress$1.apply$mcI$sp(CompressionUtils.scala:143)
 at 
 kafka.message.CompressionUtils$$anonfun$decompress$1.apply(CompressionUtils.scala:143)
 at 
 kafka.message.CompressionUtils$$anonfun$decompress$1.apply(CompressionUtils.scala:143)
 at 
 scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598)
 at 
 scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598)
 at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555)
 at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549)
 at 
 scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394)
 at 
 scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394)
 at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555)
 at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549)
 at scala.collection.immutable.Stream.foreach(Stream.scala:255)
 at 
 kafka.message.CompressionUtils$.decompress(CompressionUtils.scala:143)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:119)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:132)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:81)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
 at scala.collection.Iterator$class.foreach(Iterator.scala:631)
 at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
 at kafka.message.MessageSet.foreach(MessageSet.scala:87)
 at kafka.log.Log.append(Log.scala:204)
 at 
 kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:70)
 at 
 kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:63)
 at 
 kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:63)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
 at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
 at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
 at 
 kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:63)
 at 
 kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:42)
 at 
 kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:42)
 at kafka.network.Processor.handle(SocketServer.scala:297)
 at kafka.network.Processor.read(SocketServer.scala:320)
 at kafka.network.Processor.run(SocketServer.scala:215)
 at 

[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.

2012-03-06 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13223407#comment-13223407
 ] 

Jun Rao commented on KAFKA-49:
--

Prashanth,

Yes, that's actually a real bug. Instead of returning in retry, we should just 
set a boolean to indicate that a send has succeeded. At the end, we will throw 
FailedToSendMessageException if the boolean is not set. Otherwise, we will 
continue with the for loop. Could you file a separate jira to fix that? Thanks 
for catching the bug.

 Add acknowledgement to the produce request.
 ---

 Key: KAFKA-49
 URL: https://issues.apache.org/jira/browse/KAFKA-49
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao
Assignee: Prashanth Menon

 Currently, the produce request doesn't get acknowledged. We need to have a 
 broker send a response to the producer and have the producer wait for the 
 response before sending the next request.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.

2012-03-05 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13222700#comment-13222700
 ] 

Jun Rao commented on KAFKA-49:
--

Prashanth,

That's a good question. Initially, I was just thinking that for async 
producers, if we get an error during send (after retries), we will just log the 
error without telling the client. Currently, the event handler is not really 
extensible. I can image that we add some kind of callback to return those 
errors. The question is what will the client do on those errors. Will it 
resend? If so, we will need to pass the failed requests through callback too. I 
am curious about how other messaging systems like activeMQ do in the async mode.

 Add acknowledgement to the produce request.
 ---

 Key: KAFKA-49
 URL: https://issues.apache.org/jira/browse/KAFKA-49
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao
Assignee: Prashanth Menon

 Currently, the produce request doesn't get acknowledged. We need to have a 
 broker send a response to the producer and have the producer wait for the 
 response before sending the next request.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.

2012-03-05 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13222779#comment-13222779
 ] 

Jun Rao commented on KAFKA-49:
--

The current defaulteventhandler already refreshes topic metadata on retries. 
So, if we return any failed request to the client, there is probably not much 
the client can do, except for logging it. In any case, we should use a separate 
jira to track if we need any aysnc callback on the producer side.

 Add acknowledgement to the produce request.
 ---

 Key: KAFKA-49
 URL: https://issues.apache.org/jira/browse/KAFKA-49
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao
Assignee: Prashanth Menon

 Currently, the produce request doesn't get acknowledged. We need to have a 
 broker send a response to the producer and have the producer wait for the 
 response before sending the next request.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-240) implement new producer and consumer request format

2012-03-02 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13221183#comment-13221183
 ] 

Jun Rao commented on KAFKA-240:
---

Joe, 

kafka-239 has been committed to the 0.8 branch. You are good to go. After 
rebasing your code, make sure all unit tests still pass.

 implement new producer and consumer request format
 --

 Key: KAFKA-240
 URL: https://issues.apache.org/jira/browse/KAFKA-240
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
  Labels: fetch, replication, wireprotocol
 Fix For: 0.8

 Attachments: KAFKA-240-FetchRequest-v1.patch, 
 KAFKA-240-FetchRequest-v2.patch, KAFKA-240-FetchRequest-validation-v1.patch, 
 KAFKA-240.ProducerRequest.v2.patch, KAFKA-240.ProducerRequest.v3.patch, 
 KAFKA-240.ProducerRequest.v4.patch, KAFKA-240.v3.patch, 
 kafka-240_unittestfix_delta.patch


 We want to change the producer/consumer request/response format according to 
 the discussion in the following wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-240) implement new producer and consumer request format

2012-03-02 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13221318#comment-13221318
 ] 

Jun Rao commented on KAFKA-240:
---

Yes, however, that information is not passed into ProduceRequest. So the server 
doesn't know the version of the producer client.

 implement new producer and consumer request format
 --

 Key: KAFKA-240
 URL: https://issues.apache.org/jira/browse/KAFKA-240
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
  Labels: fetch, replication, wireprotocol
 Fix For: 0.8

 Attachments: KAFKA-240-FetchRequest-v1.patch, 
 KAFKA-240-FetchRequest-v2.patch, KAFKA-240-FetchRequest-validation-v1.patch, 
 KAFKA-240.ProducerRequest.v2.patch, KAFKA-240.ProducerRequest.v3.patch, 
 KAFKA-240.ProducerRequest.v4.patch, KAFKA-240.v3.patch, 
 kafka-240_unittestfix_delta.patch


 We want to change the producer/consumer request/response format according to 
 the discussion in the following wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-239) Wire existing producer and consumer to use the new ZK data structure

2012-03-01 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13220155#comment-13220155
 ] 

Jun Rao commented on KAFKA-239:
---

Please address items 2 and 11. For 4, I am fine leaving the ZK check in 
LogManager for now. Please add a comment that this will be fixed later.

Unit tests seem to hang consistently for me. Seems to hang on 
testLatestOffsetResetForward(kafka.integration.AutoOffsetResetTest).

 Wire existing producer and consumer to use the new ZK data structure
 

 Key: KAFKA-239
 URL: https://issues.apache.org/jira/browse/KAFKA-239
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jun Rao
Assignee: Neha Narkhede
 Attachments: kafka-239-latest-revision-unit-tests-passing.patch, 
 kafka-239-v2.patch, kafka-239-v3.patch


 We can assume the leader of a partition is always the first replica. Data 
 will only be stored in the first replica. So, there is no fault-tolerance 
 support yet. Just make the partition logical.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-50) kafka intra-cluster replication support

2012-03-01 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-50?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13220290#comment-13220290
 ] 

Jun Rao commented on KAFKA-50:
--

Yes, we can start a new wiki for each of the sub-jiras, if needed.

 kafka intra-cluster replication support
 ---

 Key: KAFKA-50
 URL: https://issues.apache.org/jira/browse/KAFKA-50
 Project: Kafka
  Issue Type: New Feature
Reporter: Jun Rao
Assignee: Jun Rao
 Fix For: 0.8

 Attachments: kafka_replication_detailed_design_v2.pdf, 
 kafka_replication_highlevel_design.pdf, kafka_replication_lowlevel_design.pdf


 Currently, Kafka doesn't have replication. Each log segment is stored in a 
 single broker. This limits both the availability and the durability of Kafka. 
 If a broker goes down, all log segments stored on that broker become 
 unavailable to consumers. If a broker dies permanently (e.g., disk failure), 
 all unconsumed data on that node is lost forever. Our goal is to replicate 
 every log segment to multiple broker nodes to improve both the availability 
 and the durability. 
 We'd like to support the following in Kafka replication: 
 1. Configurable synchronous and asynchronous replication 
 2. Small unavailable window (e.g., less than 5 seconds) during broker 
 failures 
 3. Auto recovery when a failed broker rejoins 
 4. Balanced load when a broker fails (i.e., the load on the failed broker is 
 evenly spread among multiple surviving brokers)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-239) Wire existing producer and consumer to use the new ZK data structure

2012-02-29 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13219370#comment-13219370
 ] 

Jun Rao commented on KAFKA-239:
---

1. ZkUtils.getTopicPartitionsPath: There are a couple reasons why we chose to 
use /brokers/topics/partitions to store partition data, instead of 
/brokers/topics: (1) This distinguishes the ZK layout in 0.8 from 0.7. In 0.7, 
we already store broker ids directly under /brokers/topics. So there could be 
confusion if we put partition ids at the same level. (2) This also leaves room 
for future extension to add non-partition level per topic info in ZK.

2. ZkUtils: rename idoesBrokerHostPartition to something like 
isPartitionOnBroker

3. LogManager: logFlusherScheduler.startUp is called twice.

4. LogManager.getOrCreateLog: The LogManager shouldn't need to know anything 
about ZK. The check of whether a partition exists on a broker should be done at 
KafkaApi level. Ideally, we just want to check partition ownership from a local 
cache, which will be populated by ZK listeners (part of kafka-44). In this 
patch, we can either not check at all or directly check from ZK (with the 
intention to have it optimized in kafka-44).

5. KafkaServer: the log cleaner scheduler is scheduled in LogManager, should we 
start up the schedule there too. If there is good reason to do that, should we 
protect startUp from being called more than once?

6. KafkaScheduler: what's the benefit of having a startUp method? It seems 
creating the executor in the constructor is simpler.

7. Partition should be made logical. It only contains topic and partition id. 
There will be a new entity Replica which is associated with broker id.

8. ProducerPool: remove unused import and fix indentation in close()

9. AsyncProducerTest: There are duplicated code that sets up mock to form 
expected partition metadata. Can we have a separate method to share the code?

10. Producer: Is zkClient in the constructor just for testing? If so, add a 
comment to indicate that.

11. TestUtils: why do we need checkSetEqual? Doesn't scala do that by default?

12. ZookeeperConsumerConnectorTest.testLederSelectionForPartition: is it really 
testing leader election? It seems to be testing partition ownership.

13. ZkUtils.getLeaderForPartition: This is probably fine for this patch. 
However, we should think whether it is better to get leader info from ZK 
directly or use the getMetaData api.


 Wire existing producer and consumer to use the new ZK data structure
 

 Key: KAFKA-239
 URL: https://issues.apache.org/jira/browse/KAFKA-239
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jun Rao
Assignee: Neha Narkhede
 Attachments: kafka-239-latest-revision-unit-tests-passing.patch, 
 kafka-239-v2.patch


 We can assume the leader of a partition is always the first replica. Data 
 will only be stored in the first replica. So, there is no fault-tolerance 
 support yet. Just make the partition logical.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-279) kafka-console-producer does not take in customized values of --batch-size or --timeout

2012-02-27 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13217331#comment-13217331
 ] 

Jun Rao commented on KAFKA-279:
---

That typically means that you are sending data at a rate faster than the broker 
can persist. Try increasing flush.interval to sth like 1 on the broker to 
increase server throughput.

 kafka-console-producer does not take in customized values of --batch-size or 
 --timeout
 --

 Key: KAFKA-279
 URL: https://issues.apache.org/jira/browse/KAFKA-279
 Project: Kafka
  Issue Type: Bug
  Components: contrib
Affects Versions: 0.7
 Environment: Ubuntu 10.04, openjdk1.6 with default installation of 0.7
Reporter: milind parikh
Priority: Minor
 Attachments: kafka-279.patch


 1. While the default console-producer, console-consumer paradigm works great, 
 when I try modiying the batch size
 bin/kafka-console-producer.sh --batch-size 300   --zookeeper localhost:2181 
 --topic test1
 it gives me a
 Exception in thread main java.lang.NumberFormatException: null
 at java.lang.Integer.parseInt(Integer.java:443)
 at java.lang.Integer.parseInt(Integer.java:514)
 at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207)
 at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
 at kafka.utils.Utils$.getIntInRange(Utils.scala:189)
 at kafka.utils.Utils$.getInt(Utils.scala:174)
 at 
 kafka.producer.async.AsyncProducerConfigShared$class.$init$(AsyncProducerConfig.scala:45)
 at kafka.producer.ProducerConfig.init(ProducerConfig.scala:25)
 at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:108)
 at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
 I have looked at the code and can't figure out what's wrong
 2. When I do bin/kafka-console-producer.sh --timeout 3   --zookeeper 
 localhost:2181 --topic test1
 I would think that console-producer would wait for 30s if the batch size 
 (default 200) is not full. It doesn't. It takes the same time without the 
 timeout parameter (default 1000) and dumps whatever the batch size.
 Resolution from Jun
 1. The code does the following to set batch size
  props.put(batch.size, batchSize)
 Instead, it should do
  props.put(batch.size, batchSize.toString)
 2. It sets the wrong property name for timeout. Instead of doing
props.put(queue.enqueueTimeout.ms, sendTimeout.toString)
 it should do
props.put(queue.time, sendTimeout.toString)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-286) consumer sometimes don't release partition ownership properly in ZK during rebalance

2012-02-27 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13217375#comment-13217375
 ] 

Jun Rao commented on KAFKA-286:
---

Will this happen? It doesn't seem possible to me. In step 2, when we release 
0-0 and 0-1 during rebalance, we clear topicRegistry. Since this rebalance 
fails, topicRegistry will not be populated. So, in step 4, there is nothing to 
release for c1.

 consumer sometimes don't release partition ownership properly in ZK during 
 rebalance
 

 Key: KAFKA-286
 URL: https://issues.apache.org/jira/browse/KAFKA-286
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jun Rao
Assignee: Jun Rao
 Fix For: 0.7.1

 Attachments: kafka-286.patch, kafka-286_v2.patch




--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-249) Separate out Kafka mirroring into a stand-alone app

2012-02-27 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13217513#comment-13217513
 ] 

Jun Rao commented on KAFKA-249:
---

Could we support subscribing to wildcard topics directly in consumerConnector? 
We could add a new api like the following:

createMessageStream(topicRegex: String) : KafkaMessageStream

All topics that match topicRegex will be returned in a single message stream, 
which can then be iterated.

 Separate out Kafka mirroring into a stand-alone app
 ---

 Key: KAFKA-249
 URL: https://issues.apache.org/jira/browse/KAFKA-249
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Joel Koshy
Assignee: Joel Koshy
 Fix For: 0.7.1

 Attachments: KAFKA-249.v1.patch


 I would like to discuss on this jira, the feasibility/benefits of separating
 out Kafka's mirroring feature from the broker into a stand-alone app, as it
 currently has a couple of limitations and issues.
 For example, we recently had to deal with Kafka mirrors that were in fact
 idle due to the fact that mirror threads were not created at start-up due to
 a rebalancing exception, but the Kafka broker itself did not shutdown. This
 has since been fixed, but is indicative of (avoidable) problems in embedding
 non-broker specific features in the broker.
 Logically, it seems to make sense to separate it out to achieve better
 division of labor.  Furthermore, enhancements to mirroring may be less
 clunky to implement and use with a stand-alone app.  For example to support
 custom partitioning on the target cluster, or to mirror from multiple
 clusters we would probably need to be able to pass in multiple embedded
 consumer/embedded producer configs, which would be less ugly if the
 mirroring process were a stand-alone app.  Also, if we break it out, it
 would be convenient to use as a consumption engine for the console
 consumer which will make it easier to add on features such as wildcards in
 topic consumption, since it contains a ZooKeeper topic discovery component.
 Any suggestions and/or objections to this?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-44) Various ZK listeners to support intra-cluster replication

2012-02-24 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-44?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13215728#comment-13215728
 ] 

Jun Rao commented on KAFKA-44:
--

It may be possible, but this can be a bit tricky. When you move a partition, 
you want to wait until the partition in the new broker has fully caught up, 
before deleting the one on the old broker. So, one way to achieve this is to 
have another ZK path that indicates this transition state. After the transition 
is done, the new assignment will be added to the assigned_partition path.

In any case, let's start by just focusing on static partition assignment. We 
can worry about partition reassignment a bit later when we get to kafka-42.

 Various ZK listeners to support intra-cluster replication
 -

 Key: KAFKA-44
 URL: https://issues.apache.org/jira/browse/KAFKA-44
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao

 We need to implement the new ZK listeners for the new paths registered in ZK.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-45) Broker startup, leader election, becoming a leader/follower for intra-cluster replication

2012-02-21 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13213033#comment-13213033
 ] 

Jun Rao commented on KAFKA-45:
--

Prashanth,

That's a good question. We'd like to make replicas identical with each other. 
This is easy for size-based log retention, but harder for time-based log 
retention. What you proposed is to have only the leader delete old log segments 
and propagate this information to the followers. This seems like a reasonable 
approach. The question is how should the leader communicate such information to 
the followers. One possibility is to piggyback on the FetchResponse returned to 
the followers. This will mean some extra optional fields in FetchResponse.

 Broker startup, leader election, becoming a leader/follower for intra-cluster 
 replication
 -

 Key: KAFKA-45
 URL: https://issues.apache.org/jira/browse/KAFKA-45
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao

 We need to implement the logic for starting a broker with replicated 
 partitions, the leader election logic and how to become a leader and a 
 follower.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-44) Various ZK listeners to support intra-cluster replication

2012-02-21 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-44?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13213283#comment-13213283
 ] 

Jun Rao commented on KAFKA-44:
--

Not sure if I follow exactly what you are proposing here. In particular, what 
does the broker recreate? My thinking is the following:

/brokers/[brokerId]/new_partition/[topic:partitionId] is the source of truth 
about what topic/partitions a broker has and that path is only created during 
topic creation. A broker listens to that path to pick up new topic/partition 
assigned to it. When a broker starts up, it simply reads all 
/brokers/[brokerId]/new_partition/[topic:partitionId] to determine the set of 
topic/partition that it should have. So the broker never needs to recreate that 
path.

 Various ZK listeners to support intra-cluster replication
 -

 Key: KAFKA-44
 URL: https://issues.apache.org/jira/browse/KAFKA-44
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao

 We need to implement the new ZK listeners for the new paths registered in ZK.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-277) Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function

2012-02-20 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13212133#comment-13212133
 ] 

Jun Rao commented on KAFKA-277:
---

Some comments:
1. The variable event just binds to every item in the sequence by the foreach 
method. There is no need to rename it since each item in processedEvents is 
supposed to be a single event.
2. In ByteBufferMessageSet, instead of duplicating code in shallowIterator, 
could we rename deepIterator to internalIterator and add a flag to control 
whether we want to do shallow iteration or deep iteration? In general, we don't 
want to expose the shallow iterator externally. So, it's better if we just add 
a verifyMessageSize method in ByteBufferMessageSet that uses shallow iterator.
 

 Add a shallow iterator to the ByteBufferMessageSet, which is only used in 
 SynchProducer.verifyMessageSize() function
 

 Key: KAFKA-277
 URL: https://issues.apache.org/jira/browse/KAFKA-277
 Project: Kafka
  Issue Type: Bug
Reporter: Yang Ye
 Attachments: shallow_iterator.patch


 Shallow iterator just traverse the first level messages of a 
 ByteBufferMessageSet, compressed messages won't be decompressed and treated 
 individually 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-268) Add another reconnection condition to the syncProducer: the time elapsed since last connection

2012-02-20 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13212169#comment-13212169
 ] 

Jun Rao commented on KAFKA-268:
---

Committed the improvement to trunk.

 Add  another reconnection condition to the syncProducer:  the time elapsed 
 since last connection
 

 Key: KAFKA-268
 URL: https://issues.apache.org/jira/browse/KAFKA-268
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: Yang Ye
Priority: Minor
 Fix For: 0.7.1

 Attachments: kafka-reconnect-time.patch, 
 random_last_connection_time.patch, time_based_reconnect_feature.patch


 Add  another reconnection condition to the syncProducer:  the time elapsed 
 since last connection. If it's larger than the pre-specified threshold, close 
 the connection and reopen it. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-268) Add another reconnection condition to the syncProducer: the time elapsed since last connection

2012-02-17 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13210382#comment-13210382
 ] 

Jun Rao commented on KAFKA-268:
---

A suggestion: It's possible that we have a number of producer clients all 
started at around the same time. Then they will all reconnect at almost exactly 
the same time. This is probably not good for load balancing. What we want is to 
spread the reconnects. One way to do that is to initialize lastConnectionTime 
to a random timestamp btw (now - reconnectTimeInterval) and now.

 Add  another reconnection condition to the syncProducer:  the time elapsed 
 since last connection
 

 Key: KAFKA-268
 URL: https://issues.apache.org/jira/browse/KAFKA-268
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: Yang Ye
Priority: Minor
 Fix For: 0.7.1

 Attachments: kafka-reconnect-time.patch, 
 time_based_reconnect_feature.patch


 Add  another reconnection condition to the syncProducer:  the time elapsed 
 since last connection. If it's larger than the pre-specified threshold, close 
 the connection and reopen it. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-268) Add another reconnection condition to the syncProducer: the time elapsed since last connection

2012-02-17 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13210620#comment-13210620
 ] 

Jun Rao commented on KAFKA-268:
---

How about the following?
private var lastConnectionTime = System.currentTimeMillis - 
SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval

 Add  another reconnection condition to the syncProducer:  the time elapsed 
 since last connection
 

 Key: KAFKA-268
 URL: https://issues.apache.org/jira/browse/KAFKA-268
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: Yang Ye
Priority: Minor
 Fix For: 0.7.1

 Attachments: kafka-reconnect-time.patch, 
 random_last_connection_time.patch, time_based_reconnect_feature.patch


 Add  another reconnection condition to the syncProducer:  the time elapsed 
 since last connection. If it's larger than the pre-specified threshold, close 
 the connection and reopen it. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-265) Add a queue of zookeeper notifications in the zookeeper consumer to reduce the number of rebalancing attempts

2012-02-16 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13209992#comment-13209992
 ] 

Jun Rao commented on KAFKA-265:
---

Committed the fix for both Condition and shutdown to trunk.

 Add a queue of zookeeper notifications in the zookeeper consumer to reduce 
 the number of rebalancing attempts
 -

 Key: KAFKA-265
 URL: https://issues.apache.org/jira/browse/KAFKA-265
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.7
Reporter: Neha Narkhede
Assignee: Jun Rao
 Fix For: 0.7.1

 Attachments: kafka-265.patch, kafka-265_await_fix.patch, 
 kafka-265_shutdown.patch, kafka-265_v2.patch

   Original Estimate: 168h
  Remaining Estimate: 168h

 The correct fix for KAFKA-262 and other known issues with the current 
 consumer rebalancing approach, is to get rid of the cache in the zookeeper 
 consumer. 
 The side-effect of that fix, though, is the large number of zookeeper 
 notifications that will trigger a full rebalance operation on the consumer. 
 Ideally, the zookeeper notifications can be batched and only one rebalance 
 operation can be triggered for several such ZK notifications. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-274) Handle corrupted messages cleanly

2012-02-16 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13209993#comment-13209993
 ] 

Jun Rao commented on KAFKA-274:
---

+1 for the patch.

 Handle corrupted messages cleanly
 -

 Key: KAFKA-274
 URL: https://issues.apache.org/jira/browse/KAFKA-274
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Fix For: 0.7.1

 Attachments: KAFKA-274.patch, KAFKA-274_v2.patch


 This is related to KAFKA-273 and is filed to improve the code to have more 
 defensive checks against corrupted data. The data could get corrupted either 
 due to a Kafka bug or due to bit flipping on the network. The message 
 iterator should check if the message is valid before trying to decompress it. 
 On the producer side, defensive checks can be turned on to verify the 
 messages before writing those to the socket.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-240) implement new producer and consumer request format

2012-02-15 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13208691#comment-13208691
 ] 

Jun Rao commented on KAFKA-240:
---

Thanks Prashanth, excellent patch. Committed Validation v1 patch to 0.8.

 implement new producer and consumer request format
 --

 Key: KAFKA-240
 URL: https://issues.apache.org/jira/browse/KAFKA-240
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
  Labels: fetch, replication, wireprotocol
 Fix For: 0.8

 Attachments: KAFKA-240-FetchRequest-v1.patch, 
 KAFKA-240-FetchRequest-v2.patch, KAFKA-240-FetchRequest-validation-v1.patch, 
 KAFKA-240.v3.patch


 We want to change the producer/consumer request/response format according to 
 the discussion in the following wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-240) implement new producer and consumer request format

2012-02-13 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13206927#comment-13206927
 ] 

Jun Rao commented on KAFKA-240:
---

Prashanth,

Another thing. We don't enforce that a given topic appears in at most 1 
OffsetDetail in a FetchRequest. We will need to do that since FetchResponse 
code assumes that. 

 implement new producer and consumer request format
 --

 Key: KAFKA-240
 URL: https://issues.apache.org/jira/browse/KAFKA-240
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
  Labels: fetch, replication, wireprotocol
 Fix For: 0.8

 Attachments: KAFKA-240-FetchRequest-v1.patch, 
 KAFKA-240-FetchRequest-v2.patch, KAFKA-240.v3.patch


 We want to change the producer/consumer request/response format according to 
 the discussion in the following wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-268) Add another reconnection condition to the syncProducer: the time elapsed since last connection

2012-02-13 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13206955#comment-13206955
 ] 

Jun Rao commented on KAFKA-268:
---

Some comments:
1. SyncProducerConfigShared: change reconnectTimeInterval to 
reconnectTimeIntervalMs and reconnect.timeInterval to 
reconnect.time.interval.ms, to be consistent with existing naming convention.
2. There is no real change to ByteBufferMessageSet and Utils. Please revert 
them.
3. The default value of reconnectTimeInterval is too small. Let's set it to 10 
minutes. Also, we probably need a way to turn off time-based reconnect. How 
about using a negative value to indicate that? Let's also add a comment in the 
code to document that.

 Add  another reconnection condition to the syncProducer:  the time elapsed 
 since last connection
 

 Key: KAFKA-268
 URL: https://issues.apache.org/jira/browse/KAFKA-268
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: Yang Ye
Priority: Minor
 Attachments: kafka-reconnect-time.patch


 Add  another reconnection condition to the syncProducer:  the time elapsed 
 since last connection. If it's larger than the pre-specified threshold, close 
 the connection and reopen it. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-270) sync producer / consumer test producing lot of kafka server exceptions not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html

2012-02-13 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13207023#comment-13207023
 ] 

Jun Rao commented on KAFKA-270:
---

A few things to try:

1. It seems that there is ZK session expiration in the client. This should be 
rare. If it's frequent, it's very likely caused by client GC. Please check your 
GC log.
2. Enable debug level logging in FileMessageSet in the broker. You will see the 
flush time for each log write. See if the flush time is reasonable (typically 
low 10s of ms) since it controls how many IOs a broker can do per second. 

  sync producer / consumer test producing lot of kafka server exceptions  not 
 getting the throughput mentioned here 
 http://incubator.apache.org/kafka/performance.html
 --

 Key: KAFKA-270
 URL: https://issues.apache.org/jira/browse/KAFKA-270
 Project: Kafka
  Issue Type: Bug
  Components: clients, core
Affects Versions: 0.7
 Environment: Linux 2.6.18-238.1.1.el5 , x86_64 x86_64 x86_64 
 GNU/Linux 
 ext3 file system with raid10
Reporter: Praveen Ramachandra
  Labels: clients, core, newdev, performance

 I am getting ridiculously low producer and consumer throughput.
 I am using default config values for producer, consumer and broker
 which are very good starting points, as they should yield sufficient
 throughput.
 Appreciate if you point what settings/changes-in-code needs to be done
 to get higher throughput.
 I changed num.partitions in the server.config to 10.
 Please look below for exception and error messages from the server
 BTW: I am running server, zookeeper, producer, consumer on the same host.
 Consumer Code=
long startTime = System.currentTimeMillis();
long endTime = startTime + runDuration*1000l;
Properties props = new Properties();
props.put(zk.connect, localhost:2181);
props.put(groupid, subscriptionName); // to support multiple
 subscribers
props.put(zk.sessiontimeout.ms, 400);
props.put(zk.synctime.ms, 200);
props.put(autocommit.interval.ms, 1000);
consConfig =  new ConsumerConfig(props);
consumer =
 kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
MapString, Integer topicCountMap = new HashMapString, Integer();
topicCountMap.put(topicName, new Integer(1)); // has the topic
 to which to subscribe to
MapString, ListKafkaMessageStreamMessage consumerMap =
 consumer.createMessageStreams(topicCountMap);
KafkaMessageStreamMessage stream =  
 consumerMap.get(topicName).get(0);
ConsumerIteratorMessage it = stream.iterator();
while(System.currentTimeMillis() = endTime )
{
it.next(); // discard data
consumeMsgCount.incrementAndGet();
}
 End consumer CODE
 =Producer CODE
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(zk.connect, localhost:2181);
// Use random partitioner. Don't need the key type. Just
 set it to Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.ProducerInteger,
 String(new ProducerConfig(props));
long endTime = startTime + runDuration*1000l; // run duration
 is in seconds
while(System.currentTimeMillis() = endTime )
{
String msg =
 org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
producer.send(new ProducerDataInteger, String(topicName, msg));
pc.incrementAndGet();
}
java.util.Date date = new java.util.Date(System.currentTimeMillis());
System.out.println(date+ :: stopped producer for topic+topicName);
 =END Producer CODE
 I see a bunch of exceptions like this
 [2012-02-11 02:44:11,945] ERROR Closing socket for /188.125.88.145 because of 
 error (kafka.network.Processor)
 java.io.IOException: Connection reset by peer
   at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
   at 
 sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
   at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
   at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:107)
   at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51)
   at kafka.network.MultiSend.writeTo(Transmission.scala:95)
   at kafka.network.Processor.write(SocketServer.scala:332)
   at kafka.network.Processor.run(SocketServer.scala:209)
   at java.lang.Thread.run(Thread.java:662)
 java.io.IOException: Connection reset by peer
   at 

[jira] [Commented] (KAFKA-47) Create topic support and new ZK data structures for intra-cluster replication

2012-02-08 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-47?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13203725#comment-13203725
 ] 

Jun Rao commented on KAFKA-47:
--

Prashanth, good question.

Yes, we could continue using the current log structure 
{log.dir}/topicname-partitionid. The only thing is that we would like to store 
some per topic metadata on disk, e.g., the version id (creation time) of a 
topic (to deal with some of the edge cases during topic re-creation). With the 
current structure, we either have to duplicate the topic metadata in each 
partition directory or deterministically pick one partition (like the smallest 
one) to store the metadata. Neither is ideal. It's much cleaner if we use the 
new structure {log.dir}/topicname/partitionid. Then the topic metadata can be 
stored under {log.dir}/topicname.

 Create topic support and new ZK data structures for intra-cluster replication
 -

 Key: KAFKA-47
 URL: https://issues.apache.org/jira/browse/KAFKA-47
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao

 We need the DDL syntax for creating new topics. May need to use things like 
 javaCC. Also, we need to register new data structures in ZK accordingly.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-262) Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own

2012-02-08 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13203748#comment-13203748
 ] 

Jun Rao commented on KAFKA-262:
---

Some comments:

1. In ZookeeperConsumerConnector.reflectPartitionOwnershipDecision, the local 
variable success is not intuitive. It should be named to something like 
hasFailure.

2. In ZookeeperConsumerConnector.releasePartitionOwnership. It's not clear to 
me why this method has to take an input parameter. Wouldn't it be simpler to 
always release partition ownership according to topicRegistry?



 Bug in the consumer rebalancing logic causes one consumer to release 
 partitions that it does not own
 

 Key: KAFKA-262
 URL: https://issues.apache.org/jira/browse/KAFKA-262
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Fix For: 0.7.1

 Attachments: kafka-262.patch

   Original Estimate: 24h
  Remaining Estimate: 24h

 The consumer maintains a cache of topics and partitions it owns along with 
 the fetcher queues corresponding to those. But while releasing partition 
 ownership, this cache is not cleared. This leads the consumer to release a 
 partition that it does not own any more. This can also lead the consumer to 
 commit offsets for partitions that it no longer consumes from. 
 The rebalance operation goes through following steps -
 1. close fetchers
 2. commit offsets
 3. release partition ownership. 
 4. rebalance, add topic, partition and fetcher queues to the topic registry, 
 for all topics that the consumer process currently wants to own. 
 5. If the consumer runs into conflict for one topic or partition, the 
 rebalancing attempt fails, and it goes to step 1.
 Say, there are 2 consumers in a group, c1 and c2. Both are consuming topic1 
 with partitions 0-0, 0-1 and 1-0. Say c1 owns 0-0 and 0-1 and c2 owns 1-0.
 1. Broker 1 goes down. This triggers rebalancing attempt in c1 and c2.
 2. c1's release partition ownership and during step 4 (above), fails to 
 rebalance.
 3. Meanwhile, c2 completes rebalancing successfully, and owns partition 0-1 
 and starts consuming data.
 4. c1 starts next rebalancing attempt and during step 3 (above), it releases 
 partition 0-1. During step 4, it owns partition 0-0 again, and starts 
 consuming data.
 5. Effectively, rebalancing has completed successfully, but there is no owner 
 for partition 0-1 registered in Zookeeper.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-262) Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own

2012-02-08 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13204140#comment-13204140
 ] 

Jun Rao commented on KAFKA-262:
---

2. If releasePartitionOwnership always just checks topicRegistry, the code and 
the logic will be a bit simpler. Could we run the system test and see if there 
is any issue with the simplification?

 Bug in the consumer rebalancing logic causes one consumer to release 
 partitions that it does not own
 

 Key: KAFKA-262
 URL: https://issues.apache.org/jira/browse/KAFKA-262
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Fix For: 0.7.1

 Attachments: kafka-262.patch

   Original Estimate: 24h
  Remaining Estimate: 24h

 The consumer maintains a cache of topics and partitions it owns along with 
 the fetcher queues corresponding to those. But while releasing partition 
 ownership, this cache is not cleared. This leads the consumer to release a 
 partition that it does not own any more. This can also lead the consumer to 
 commit offsets for partitions that it no longer consumes from. 
 The rebalance operation goes through following steps -
 1. close fetchers
 2. commit offsets
 3. release partition ownership. 
 4. rebalance, add topic, partition and fetcher queues to the topic registry, 
 for all topics that the consumer process currently wants to own. 
 5. If the consumer runs into conflict for one topic or partition, the 
 rebalancing attempt fails, and it goes to step 1.
 Say, there are 2 consumers in a group, c1 and c2. Both are consuming topic1 
 with partitions 0-0, 0-1 and 1-0. Say c1 owns 0-0 and 0-1 and c2 owns 1-0.
 1. Broker 1 goes down. This triggers rebalancing attempt in c1 and c2.
 2. c1's release partition ownership and during step 4 (above), fails to 
 rebalance.
 3. Meanwhile, c2 completes rebalancing successfully, and owns partition 0-1 
 and starts consuming data.
 4. c1 starts next rebalancing attempt and during step 3 (above), it releases 
 partition 0-1. During step 4, it owns partition 0-0 again, and starts 
 consuming data.
 5. Effectively, rebalancing has completed successfully, but there is no owner 
 for partition 0-1 registered in Zookeeper.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-240) implement new producer and consumer request format

2012-02-08 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13204170#comment-13204170
 ] 

Jun Rao commented on KAFKA-240:
---

Prashanth, thanks for the patch. Looks good overall.
1. FetchResponse: Since we already have error code at the PartitionData level, 
should we get rid of the error code at the FetchResponse level? There is no 
obvious use case for the latter.
2. Should FetchResponse.messageSet return ByteBufferMessageSet? That way caller 
doesn't have to cast.
3. Could you rebase to the latest 0.8 branch?

Joe, do you think we can take Prashanth's patch first and apply your patch on 
top later?

 implement new producer and consumer request format
 --

 Key: KAFKA-240
 URL: https://issues.apache.org/jira/browse/KAFKA-240
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
  Labels: fetch, replication, wireprotocol
 Fix For: 0.8

 Attachments: KAFKA-240-FetchRequest-v1.patch


 We want to change the producer/consumer request/response format according to 
 the discussion in the following wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-262) Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own

2012-02-08 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13204203#comment-13204203
 ] 

Jun Rao commented on KAFKA-262:
---

In releasePartitionOwnership(), topicAndPartitionsToBeReleased is no longer 
used and should be removed. Otherwise, the patch looks good.

 Bug in the consumer rebalancing logic causes one consumer to release 
 partitions that it does not own
 

 Key: KAFKA-262
 URL: https://issues.apache.org/jira/browse/KAFKA-262
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Fix For: 0.7.1

 Attachments: kafka-262-v3.patch, kafka-262.patch

   Original Estimate: 24h
  Remaining Estimate: 24h

 The consumer maintains a cache of topics and partitions it owns along with 
 the fetcher queues corresponding to those. But while releasing partition 
 ownership, this cache is not cleared. This leads the consumer to release a 
 partition that it does not own any more. This can also lead the consumer to 
 commit offsets for partitions that it no longer consumes from. 
 The rebalance operation goes through following steps -
 1. close fetchers
 2. commit offsets
 3. release partition ownership. 
 4. rebalance, add topic, partition and fetcher queues to the topic registry, 
 for all topics that the consumer process currently wants to own. 
 5. If the consumer runs into conflict for one topic or partition, the 
 rebalancing attempt fails, and it goes to step 1.
 Say, there are 2 consumers in a group, c1 and c2. Both are consuming topic1 
 with partitions 0-0, 0-1 and 1-0. Say c1 owns 0-0 and 0-1 and c2 owns 1-0.
 1. Broker 1 goes down. This triggers rebalancing attempt in c1 and c2.
 2. c1's release partition ownership and during step 4 (above), fails to 
 rebalance.
 3. Meanwhile, c2 completes rebalancing successfully, and owns partition 0-1 
 and starts consuming data.
 4. c1 starts next rebalancing attempt and during step 3 (above), it releases 
 partition 0-1. During step 4, it owns partition 0-0 again, and starts 
 consuming data.
 5. Effectively, rebalancing has completed successfully, but there is no owner 
 for partition 0-1 registered in Zookeeper.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-240) implement new producer and consumer request format

2012-02-08 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13204258#comment-13204258
 ] 

Jun Rao commented on KAFKA-240:
---

Also, for the builder for FetchRequest. My concern is that it's not clear what 
the required parameters are.

 implement new producer and consumer request format
 --

 Key: KAFKA-240
 URL: https://issues.apache.org/jira/browse/KAFKA-240
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
  Labels: fetch, replication, wireprotocol
 Fix For: 0.8

 Attachments: KAFKA-240-FetchRequest-v1.patch


 We want to change the producer/consumer request/response format according to 
 the discussion in the following wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-253) Refactor the async producer to have only one queue instead of one queue per broker in a Kafka cluster

2012-02-07 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13203209#comment-13203209
 ] 

Jun Rao commented on KAFKA-253:
---

Thanks for the review. Committed to 0.8 branch. Will close the jira once it's 
committed to 0.7 branch.

 Refactor the async producer to have only one queue instead of one queue per 
 broker in a Kafka cluster
 -

 Key: KAFKA-253
 URL: https://issues.apache.org/jira/browse/KAFKA-253
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Jun Rao
 Attachments: kafka-253.patch, kafka-253_v2.patch, kafka-253_v3.patch, 
 kafka-253_v4.patch, kafka-253_v5.patch

   Original Estimate: 168h
  Remaining Estimate: 168h

 Today, the async producer is associated with a particular broker instance, 
 just like the SyncProducer. The Producer maintains a producer pool of 
 sync/async producers, one per broker. Since the producer pool creates one 
 async producer per broker, we have multiple producer queues for one Producer 
 instance. 
 With replication, a topic partition will be logical. This requires 
 refactoring the AsyncProducer to be broker agnostic. As a side effect of this 
 refactoring, we should also ensure that we have only one queue per Producer 
 instance.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-240) implement new producer and consumer request format

2012-02-03 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13199901#comment-13199901
 ] 

Jun Rao commented on KAFKA-240:
---

Joe, Prashanth,

Do you guys have patches that we can review now? We are getting very close to 
start using the new request format in 0.8.

Thanks,

 implement new producer and consumer request format
 --

 Key: KAFKA-240
 URL: https://issues.apache.org/jira/browse/KAFKA-240
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jun Rao
 Fix For: 0.8


 We want to change the producer/consumer request/response format according to 
 the discussion in the following wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-251) The ConsumerStats MBean's PartOwnerStats attribute is a string

2012-02-02 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13199447#comment-13199447
 ] 

Jun Rao commented on KAFKA-251:
---

Sorry, just get the chance to look at this. Couldn't apply the patch since 
trunk has moved. Could you rebase?

 The ConsumerStats MBean's PartOwnerStats  attribute is a string
 ---

 Key: KAFKA-251
 URL: https://issues.apache.org/jira/browse/KAFKA-251
 Project: Kafka
  Issue Type: Bug
Reporter: Pierre-Yves Ritschard
 Attachments: 0001-Incorporate-Jun-Rao-s-comments-on-KAFKA-251.patch, 
 0001-Provide-a-patch-for-KAFKA-251.patch


 The fact that the PartOwnerStats is a string prevents monitoring systems from 
 graphing consumer lag. There should be one mbean per [ topic, partition, 
 groupid ] group.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-254) A tool to GET Zookeeper partition-offset and output to files

2012-02-02 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13199458#comment-13199458
 ] 

Jun Rao commented on KAFKA-254:
---

Thanks for the patch. A few comments:
1. For consistency, let's use joptsimple for command line processing (take a 
look at some other tools as examples).
2. We should use ZkClient, instead of the raw ZK client (see 
ConsumerOffsetChecker)
3. Could we extend this to support multiple consumer groups? We can use an 
option --groups to specify all groups. If the option is not specified, we can 
get all groups registered in ZK. We will need to include group name in the 
output file.
4. Use the zk common paths already defined in ZkUtils.

Those comments are applicable to kafka-255 too.

 A tool to GET Zookeeper partition-offset and output to files
 

 Key: KAFKA-254
 URL: https://issues.apache.org/jira/browse/KAFKA-254
 Project: Kafka
  Issue Type: Task
  Components: clients
Reporter: John Fung
Assignee: John Fung
 Attachments: kafka-254-v1.patch


 A utility that retrieves the offsets of all topic partitions in ZK for a 
 specified group id and save the data to output files. A shell script also 
 comes with this tool to automate the writing of data to files in a specified 
 time interval.
 This utility expects 3 arguments:
  1. Zk host:port string
  2. group Id
  3. Output file pathname

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer

2012-01-24 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13192287#comment-13192287
 ] 

Jun Rao commented on KAFKA-238:
---

3. I agree that the API is rarely used. However, each call could request a 
large number of topics. This is particularly true in the consumer where we make 
multifetch requests (may consist of hundreds of topics). I wasn't thinking of 
caching broker info permanently. Rather, we can just cache the broker info 
within a single getMetaData request. For example, if we have 200 topics (1 
partition per topic) and 4 brokers, instead of making 200 ZK read requests, we 
just need to make 4 requests.
4. I am ok with tracking this in a separate jira.

Some new comments:
6. Remove unused imported package in AdminUtils, SyncProducerTest, etc.
7. TopicMetadataRequest.deserializeTopicMetadata should be sth like 
deserializeTopicMetadataList
8. TopicMetaData: keep comments consistent with implementation (e.g., 
doesLeaderExist is 1 byte now)
9. ZkUtils: we have a mix of getTopicPartition* and getTopicPart*. We can use 
either format. We just need to be consistent.


 add a getTopicMetaData method in broker and expose it to producer 
 --

 Key: KAFKA-238
 URL: https://issues.apache.org/jira/browse/KAFKA-238
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jun Rao
Assignee: Neha Narkhede
 Attachments: kafka-238-v1.patch, kafka-238-v2.patch


 We need a way to propagate the leader and the partition information to the 
 producer so that it can do load balancing and semantic partitioning. One way 
 to do that is to have the producer get the information from ZK directly. This 
 means that the producer needs to maintain a ZK session and has to subscribe 
 to watchers, which can be complicated. An alternative approach is to have the 
 following api on the broker.
 TopicMetaData getTopicMetaData(String: topic)
 TopicMetaData {
   Array[PartitionMetaData]: partitionsMetaData
 }
 PartitionMetaData {
   Int: partitionId
   String: leaderHostname
   Int: leaderPort
 }
 Using this api, the producer can get the metadata about a topic during 
 initial startup or leadership change of a partition.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer

2012-01-24 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13192450#comment-13192450
 ] 

Jun Rao commented on KAFKA-238:
---

+1 on the patch. 

This patch doesn't wire the getMetaData api in the producer/consumer. Is the 
intention to do that as part of kafka-239 ?

 add a getTopicMetaData method in broker and expose it to producer 
 --

 Key: KAFKA-238
 URL: https://issues.apache.org/jira/browse/KAFKA-238
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jun Rao
Assignee: Neha Narkhede
 Attachments: kafka-238-v1.patch, kafka-238-v2.patch, 
 kafka-238-v3.patch


 We need a way to propagate the leader and the partition information to the 
 producer so that it can do load balancing and semantic partitioning. One way 
 to do that is to have the producer get the information from ZK directly. This 
 means that the producer needs to maintain a ZK session and has to subscribe 
 to watchers, which can be complicated. An alternative approach is to have the 
 following api on the broker.
 TopicMetaData getTopicMetaData(String: topic)
 TopicMetaData {
   Array[PartitionMetaData]: partitionsMetaData
 }
 PartitionMetaData {
   Int: partitionId
   String: leaderHostname
   Int: leaderPort
 }
 Using this api, the producer can get the metadata about a topic during 
 initial startup or leadership change of a partition.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-251) The ConsumerStats MBean's PartOwnerStats attribute is a string

2012-01-24 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13192736#comment-13192736
 ] 

Jun Rao commented on KAFKA-251:
---

Thanks for the patch. Some comments:

1. There is compilation error.
2. Use space instead of TAB.
3. The new jmx seems to be intended for replacing 
ZookeeperConsumerConnectorMBean. If so, we should remove 
ZookeeperConsumerConnectMBean and the implementation in 
ZookeeperConsumerConnector.
4. Not every SyncRebalance triggers a successful rebalance (sometimes rebalance 
is not necessary). So, instead of unregistering/registering the MBean in 
SyncRebalance, we should do unregisterMBeans followed by registerMBeans at the 
end of updateFetcher, which is when a rebalance really happens.



 The ConsumerStats MBean's PartOwnerStats  attribute is a string
 ---

 Key: KAFKA-251
 URL: https://issues.apache.org/jira/browse/KAFKA-251
 Project: Kafka
  Issue Type: Bug
Reporter: Pierre-Yves Ritschard
 Attachments: 0001-Provide-a-patch-for-KAFKA-251.patch


 The fact that the PartOwnerStats is a string prevents monitoring systems from 
 graphing consumer lag. There should be one mbean per [ topic, partition, 
 groupid ] group.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer

2012-01-23 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13191262#comment-13191262
 ] 

Jun Rao commented on KAFKA-238:
---

The patch doesn't apply to the 0.8 branch. Could you rebase?

can't find file to patch at input line 2260
Perhaps you used the wrong -p or --strip option?
The text leading up to this was:
--
|Index: core/src/main/scala/kafka/api/MultiMessageSetSend.scala
|===
|--- core/src/main/scala/kafka/api/MultiMessageSetSend.scala(revision 
1232541)
|+++ core/src/main/scala/kafka/api/MultiMessageSetSend.scala(working copy)


 add a getTopicMetaData method in broker and expose it to producer 
 --

 Key: KAFKA-238
 URL: https://issues.apache.org/jira/browse/KAFKA-238
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jun Rao
Assignee: Neha Narkhede
 Attachments: kafka-238-v1.patch


 We need a way to propagate the leader and the partition information to the 
 producer so that it can do load balancing and semantic partitioning. One way 
 to do that is to have the producer get the information from ZK directly. This 
 means that the producer needs to maintain a ZK session and has to subscribe 
 to watchers, which can be complicated. An alternative approach is to have the 
 following api on the broker.
 TopicMetaData getTopicMetaData(String: topic)
 TopicMetaData {
   Array[PartitionMetaData]: partitionsMetaData
 }
 PartitionMetaData {
   Int: partitionId
   String: leaderHostname
   Int: leaderPort
 }
 Using this api, the producer can get the metadata about a topic during 
 initial startup or leadership change of a partition.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer

2012-01-23 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13191350#comment-13191350
 ] 

Jun Rao commented on KAFKA-238:
---

Some comments:

1. Use the new Logger helper for new log4j logging.
2. PartitionMetaData: The leader indicator just needs 1 byte, instead of 2.
3. AdminUtils.getTopicMetaDataFromZK(): Instead of getting broker info from ZK 
each time, could we collect a set of unique broker ids that are needed and call 
ZK only once for each unique id to get the broker info?
3. LogMetaData: LogMetaData is actually a replica level info, not a partition 
level info. Collecting such data in the same getGetMetaData api is tricky since 
log meta data is only available at the broker that stores a replica locally. 
So, this info is not available at every broker, you have to talk to the right 
broker to get such information. One possibility is to keep this as a separate 
api, something like getReplicaMetadata(topic, partition, brokerid). Such 
information is only returned if the api is called on the broker that owns the 
partition.
4. SyncProducerTest: no need to extend ZooKeeperTestHarness since it's included 
in KafkaServerTestHarness now.
5. We probably need to separate ZK from LogManager. This allows us to use and 
test LogManager independently. We can probably attach a handler to the 
LogManager. This can be a separate jira.


 add a getTopicMetaData method in broker and expose it to producer 
 --

 Key: KAFKA-238
 URL: https://issues.apache.org/jira/browse/KAFKA-238
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jun Rao
Assignee: Neha Narkhede
 Attachments: kafka-238-v1.patch


 We need a way to propagate the leader and the partition information to the 
 producer so that it can do load balancing and semantic partitioning. One way 
 to do that is to have the producer get the information from ZK directly. This 
 means that the producer needs to maintain a ZK session and has to subscribe 
 to watchers, which can be complicated. An alternative approach is to have the 
 following api on the broker.
 TopicMetaData getTopicMetaData(String: topic)
 TopicMetaData {
   Array[PartitionMetaData]: partitionsMetaData
 }
 PartitionMetaData {
   Int: partitionId
   String: leaderHostname
   Int: leaderPort
 }
 Using this api, the producer can get the metadata about a topic during 
 initial startup or leadership change of a partition.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-41) multi-produce and multi-fetch support with replication

2012-01-23 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-41?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13191446#comment-13191446
 ] 

Jun Rao commented on KAFKA-41:
--

Yes, this may be covered by the new wire protocol change and KAFKA-239.

 multi-produce and multi-fetch support with replication
 --

 Key: KAFKA-41
 URL: https://issues.apache.org/jira/browse/KAFKA-41
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao

 We need to figure out how to support multi-produce and multi-fetch smoothly 
 with the replication support. The client has to figure out which partitions 
 are collocated on the same broker and adjust accordingly when some partitions 
 are moved.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-240) implement new producer and consumer request format

2012-01-23 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13191471#comment-13191471
 ] 

Jun Rao commented on KAFKA-240:
---

Joe,

The changes in github make sense. A couple of suggestions:
1. We can probably use case class for ProduceRequest.
2. The return type of produce request handler should be Option[ProduceResponse] 
since the response is optional.

 implement new producer and consumer request format
 --

 Key: KAFKA-240
 URL: https://issues.apache.org/jira/browse/KAFKA-240
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jun Rao
 Fix For: 0.8


 We want to change the producer/consumer request/response format according to 
 the discussion in the following wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-240) implement new producer and consumer request format

2012-01-18 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13188716#comment-13188716
 ] 

Jun Rao commented on KAFKA-240:
---

Joe,

I think it could be simpler than that. You can have the following encoding:

# of topics (2 bytes)
topic1 encoding (see encoding format below)
...
topicN encoding

Topic Encoding:
topic name length (2 bytes)
topic name bytes
# of partitions (2 bytes)
partition1 encoding (see encoding format below)

partitionN encoding

Partition encoding:
partition id: (4 bytes)
messageSet encoding

 implement new producer and consumer request format
 --

 Key: KAFKA-240
 URL: https://issues.apache.org/jira/browse/KAFKA-240
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jun Rao
 Fix For: 0.8


 We want to change the producer/consumer request/response format according to 
 the discussion in the following wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-245) upgrade to zkclient 0.1

2012-01-12 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13185177#comment-13185177
 ] 

Jun Rao commented on KAFKA-245:
---

Do we know the github revision/date corresponding to the zkclient 0.1 release?

 upgrade to zkclient 0.1
 ---

 Key: KAFKA-245
 URL: https://issues.apache.org/jira/browse/KAFKA-245
 Project: Kafka
  Issue Type: New Feature
  Components: core
Affects Versions: 0.7
Reporter: Pierre-Yves Ritschard
Assignee: Pierre-Yves Ritschard
  Labels: newbie
 Attachments: 
 0001-KAFKA-245-zkclient-0.1-to-enable-maven-syncing.patch, 
 0003-follow-up-to-KAFKA-245-use-maven-to-fetch-zkclient.patch


 the zkclient jar bundled with kafka should be synced with what is available 
 on maven central. the artifact which has group com.github.sgroschupf, 
 artifact id zkclient and version 0.1 is from a day after the one bundled with 
 kafka and should thus be sufficient for kafka's needs.
 I have tested it locally and find no regressions.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-232) ConsumerConnector has no access to getOffsetsBefore

2012-01-12 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13185190#comment-13185190
 ] 

Jun Rao commented on KAFKA-232:
---

We discussed in the mailing before about allowing the ZK-based consumer to 
consume from an arbitrary offset during startup time. Here is the main 
complexity. Multiple consumers in the same group can consume a topic jointly. 
When they start up, which consumer sets the offset for which partitions? How do 
we prevent 2 consumers from setting the offset for the same partition?

 ConsumerConnector has no access to getOffsetsBefore 
 --

 Key: KAFKA-232
 URL: https://issues.apache.org/jira/browse/KAFKA-232
 Project: Kafka
  Issue Type: New Feature
Reporter: Edward Capriolo
Priority: Minor

 kafka.javaapi.SimpleConsumer has getOffsetsBefore. I would like this 
 ability in KafkaMessageStream or in ConsumerConnector. In this way clients 
 can access their current position.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-240) implement new producer and consumer request format

2012-01-11 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13184455#comment-13184455
 ] 

Jun Rao commented on KAFKA-240:
---

Joe,

That's a good question. We had some discussions on this in the mailing list 
before.

I agree it would be nice if we can make the 0.8 release backward compatible. 
However, it's a little bit hard because:
1. We need a process to migrate the ZK data structures and potentially on-disk 
data organization. This is particular hard since we need to migrate a whole 
kafka cluster online. In the middle of the migration, some brokers will be 
using the old structures and some other brokers will be using the new 
structures. It's not clear what a consumer should behave in this stage.
2. If something terribly wrong happens during the migration, there is no easy 
way to rollback the migration.

An alternative is to make the 0.8 release non-backward compatible. This allows 
us to incorporate any wire/on-disk changes freely. To upgrade from 0.7 to 0.8, 
one possibility is to start a new 0.8 cluster. We can provide a tool that 
continuously mirrors data from the 0.7 cluster to the new 0.8 cluster. Once 
that's done, we can first upgrade the consumers and point them to the 0.8 
cluster, followed by upgrading the producers and pointing them to the 0.8 
cluster. This is somewhat more operational work. However, it addresses both of 
the above issues. 

 implement new producer and consumer request format
 --

 Key: KAFKA-240
 URL: https://issues.apache.org/jira/browse/KAFKA-240
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jun Rao
 Fix For: 0.8


 We want to change the producer/consumer request/response format according to 
 the discussion in the following wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-202) Make the request processing in kafka asynchonous

2012-01-11 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13184626#comment-13184626
 ] 

Jun Rao commented on KAFKA-202:
---

unit tests now pass. +1 on the patch.

We should probably commit this patch to an 0.8 branch.

 Make the request processing in kafka asynchonous
 

 Key: KAFKA-202
 URL: https://issues.apache.org/jira/browse/KAFKA-202
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-202-v2.patch, KAFKA-202-v3.patch, 
 KAFKA-202-v4.patch, KAFKA-202-v5.patch, KAFKA-202-v6.patch, 
 KAFKA-48-socket-server-refactor-draft.patch


 We need to handle long-lived requests to support replication. To make this 
 work we need to make the processing mechanism asynchronous from the network 
 threads.
 To accomplish this we will retain the existing pool of network threads but 
 add a new pool of request handling threads. These will do all the disk I/O. 
 There will be a queue mechanism to transfer requests to and from this 
 secondary pool.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-228) Reduce duplicate messages served by the kafka consumer for uncompressed topics to 0

2012-01-06 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13181491#comment-13181491
 ] 

Jun Rao commented on KAFKA-228:
---

Thanks for the new patch. Overall, looks pretty good. A few extra comments:

4. ConsumerIterator: Could we make currentDataChunk a local variable in 
makeNext. This is an existing problem, but it would be good if we can fix it in 
this patch. Also, currentTopicInfo doesn't have to atomic since it's never 
updated concurrently. 

5. It's better to set the KafkaMessageStreams in the constructor of 
ZKRebalanceListener. This way, even if the ZK listener gets triggered before 
the ZookeeperConsumerConnector.consume completes, KafkaMessageStreams is 
available in the rebalance call in the listener.

6. Fetcher: remove shutdown and keep only stopConnectionsToAllBrokers


 Reduce duplicate messages served by the kafka consumer for uncompressed 
 topics to 0
 ---

 Key: KAFKA-228
 URL: https://issues.apache.org/jira/browse/KAFKA-228
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.7
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Fix For: 0.7.1

 Attachments: kafka-228-v3.patch, kafka-228.patch, kafka-228_v2.patch


 Kafka guarantees at-least once delivery of messages.The high level consumer 
 provides highly available partitioned consumption of data within the same 
 consumer group. In the event of broker failures or consumer failures within a 
 group, the high level consumer rebalances and redistributes the topic 
 partitions evenly amongst the consumers in a group. With the current design, 
 during this rebalancing operation, Kafka introduces duplicates in the 
 consumed data. 
 This JIRA improves the rebalancing operation and the consumer iterator design 
 to guarantee 0 duplicates while consuming uncompressed topics. There will be 
 a small number of duplicates while serving compressed data, but it will be 
 bound by the compression batch size.  

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-236) Make 'autooffset.reset' accept a delay in addition to {smallest,largest}

2012-01-05 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13180528#comment-13180528
 ] 

Jun Rao commented on KAFKA-236:
---

 To reset the offset, under the cover, we use getLastestOffsetBefore(), which 
supports an arbitrary starting timestamp, in addition to smallest and largest. 
However, getLastestOffsetBefore has a very coarse granularity at this moment. 
It only returns the offset of the first message in a log segment file based on 
its last modified time. So, depending on the log segment size, this may or may 
not be good enough for some applications.

 Make 'autooffset.reset' accept a delay in addition to {smallest,largest}
 

 Key: KAFKA-236
 URL: https://issues.apache.org/jira/browse/KAFKA-236
 Project: Kafka
  Issue Type: Improvement
Reporter: Mathias Herberts

 Add the possibilty to specify a delay in ms which would be used when 
 resetting offset.
 This would allow for example a client to specify it would like its offset to 
 be reset to the first offset before/after the current time - the given offset.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer

2012-01-05 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13180677#comment-13180677
 ] 

Jun Rao commented on KAFKA-238:
---

We can potentially piggy-back auto topic creation on the getTopicMetaData api. 
If auto topic creation is enabled and the broker sees no ZK data for a topic, 
the api will create the topic first.

 add a getTopicMetaData method in broker and expose it to producer 
 --

 Key: KAFKA-238
 URL: https://issues.apache.org/jira/browse/KAFKA-238
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jun Rao

 We need a way to propagate the leader and the partition information to the 
 producer so that it can do load balancing and semantic partitioning. One way 
 to do that is to have the producer get the information from ZK directly. This 
 means that the producer needs to maintain a ZK session and has to subscribe 
 to watchers, which can be complicated. An alternative approach is to have the 
 following api on the broker.
 TopicMetaData getTopicMetaData(String: topic)
 TopicMetaData {
   Array[PartitionMetaData]: partitionsMetaData
 }
 PartitionMetaData {
   Int: partitionId
   String: leaderHostname
   Int: leaderPort
 }
 Using this api, the producer can get the metadata about a topic during 
 initial startup or leadership change of a partition.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-232) ConsumerConnector has no access to getOffsetsBefore

2012-01-05 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13180841#comment-13180841
 ] 

Jun Rao commented on KAFKA-232:
---

Do you think you can add meta data in the message itself so that you know 
whether the consumed data has moved to the next minute? If so, you can turn off 
auto commit and call commitOffset() directly.

 ConsumerConnector has no access to getOffsetsBefore 
 --

 Key: KAFKA-232
 URL: https://issues.apache.org/jira/browse/KAFKA-232
 Project: Kafka
  Issue Type: New Feature
Reporter: Edward Capriolo
Priority: Minor

 kafka.javaapi.SimpleConsumer has getOffsetsBefore. I would like this 
 ability in KafkaMessageStream or in ConsumerConnector. In this way clients 
 can access their current position.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer

2012-01-05 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13180855#comment-13180855
 ] 

Jun Rao commented on KAFKA-238:
---

I can see it's useful for the api to support a list of topics. Returning all 
topics is probably too much overhead since most producers are interested in a 
subset of topics.

In our current replication design, both send and fetch requests are served only 
by the leader. We can balance the load by having multiple partitions per 
broker. We may allow followers to serve read in the future. I don't see a clear 
benefit at this moment though. 

 add a getTopicMetaData method in broker and expose it to producer 
 --

 Key: KAFKA-238
 URL: https://issues.apache.org/jira/browse/KAFKA-238
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jun Rao

 We need a way to propagate the leader and the partition information to the 
 producer so that it can do load balancing and semantic partitioning. One way 
 to do that is to have the producer get the information from ZK directly. This 
 means that the producer needs to maintain a ZK session and has to subscribe 
 to watchers, which can be complicated. An alternative approach is to have the 
 following api on the broker.
 TopicMetaData getTopicMetaData(String: topic)
 TopicMetaData {
   Array[PartitionMetaData]: partitionsMetaData
 }
 PartitionMetaData {
   Int: partitionId
   String: leaderHostname
   Int: leaderPort
 }
 Using this api, the producer can get the metadata about a topic during 
 initial startup or leadership change of a partition.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-202) Make the request processing in kafka asynchonous

2012-01-05 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13181141#comment-13181141
 ] 

Jun Rao commented on KAFKA-202:
---

Overall, the patch looks good. Some comments:

1. KafkaServer.startup doesn't have to capture exception and shutdown. The 
caller in KafkaServerStarable already does that. Plus, it shuts down embedded 
consumer appropriately if needed.

2. There is KafkaRequestHandlers.scala.rej in the patch.

3. Unit test seems to fail occasionally, giving the following error.
[info] == core-kafka / kafka.integration.LazyInitProducerTest ==
[2012-01-05 21:57:38,773] ERROR Error processing MultiProducerRequest on test:0 
(kafka.server.KafkaApis:82)
java.nio.channels.ClosedChannelException
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
at 
kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
at kafka.log.Log.append(Log.scala:215)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
at 
kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
at 
kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
at kafka.server.KafkaApis.handleMultiProducerRequest(KafkaApis.scala:64)
at kafka.server.KafkaApis.handle(KafkaApis.scala:43)
at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
at java.lang.Thread.run(Thread.java:662)
[2012-01-05 21:57:38,773] ERROR Error processing ProduceRequest on test:0 
(kafka.server.KafkaApis:82)
java.nio.channels.ClosedChannelException
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
at 
kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
at kafka.log.Log.append(Log.scala:215)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55)
at kafka.server.KafkaApis.handle(KafkaApis.scala:40)
at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
at java.lang.Thread.run(Thread.java:662)
[2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while 
handling producer request: null (kafka.server.KafkaApis:92)
java.nio.channels.ClosedChannelException
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
at 
kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
at kafka.log.Log.append(Log.scala:215)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55)
at kafka.server.KafkaApis.handle(KafkaApis.scala:40)
at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
at java.lang.Thread.run(Thread.java:662)
[2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while 
handling producer request: null (kafka.server.KafkaApis:92)
java.nio.channels.ClosedChannelException
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
at 
kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
at 

[jira] [Commented] (KAFKA-200) Support configurable send / receive socket buffer size in server

2012-01-04 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13179977#comment-13179977
 ] 

Jun Rao commented on KAFKA-200:
---

John,

Thanks for the detailed update. From TCP illustrated section 13.3.3, last 
paragraph: The shift count is automatically chosen by TCP, based on the size 
of the receive buffer. So, to set a TCP window size larger than 64K, we just 
need to make sure the receive buffer size is set properly before the connection 
is established. So, Approach 3 as you listed is the right thing to do. Could 
you upload a new patch?

 Support configurable send / receive socket buffer size in server
 

 Key: KAFKA-200
 URL: https://issues.apache.org/jira/browse/KAFKA-200
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: John Fung
 Fix For: 0.8

 Attachments: KAFKA-200.patch


 * Make the send / receive socket buffer size configurable in server.
 * KafkaConfig.scala already has the following existing variables to support 
 send / receive buffer:
 socketSendBuffer
 socketReceiveBuffer
 * The patch attached to this ticket will read the following existing settings 
 in kafka/config/server.properties and set the corresponding socket buffers
 . . .
 # The send buffer (SO_SNDBUF) used by the socket server
 socket.send.buffer=1048576
 # The receive buffer (SO_RCVBUF) used by the socket server
 socket.receive.buffer=1048576

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-232) ConsumerConnector has no access to getOffsetsBefore

2012-01-03 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13179211#comment-13179211
 ] 

Jun Rao commented on KAFKA-232:
---

Could you explain in a bit more detail how you plan to use this method? Today, 
ConsumerConnector doesn't support consuming from an arbitrary offset.

 ConsumerConnector has no access to getOffsetsBefore 
 --

 Key: KAFKA-232
 URL: https://issues.apache.org/jira/browse/KAFKA-232
 Project: Kafka
  Issue Type: New Feature
Reporter: Edward Capriolo
Priority: Minor

 kafka.javaapi.SimpleConsumer has getOffsetsBefore. I would like this 
 ability in KafkaMessageStream or in ConsumerConnector. In this way clients 
 can access their current position.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional long poll support in fetch request

2012-01-02 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13178466#comment-13178466
 ] 

Jun Rao commented on KAFKA-48:
--

Taylor,

Sorry for the late response. I am not sure that I understand your proposal. 

a. Why do we need a local socket? It seems that the same thing can be achieved 
by just turning on the write_interesting bit in the socket key corresponding to 
a client request.

b. It's not clear to me how you correlate a queued client request with the 
corresponding client socket.

 Implement optional long poll support in fetch request
 ---

 Key: KAFKA-48
 URL: https://issues.apache.org/jira/browse/KAFKA-48
 Project: Kafka
  Issue Type: Bug
Reporter: Alan Cabrera
Assignee: Jay Kreps

 Currently, the fetch request is non-blocking. If there is nothing on the 
 broker for the consumer to retrieve, the broker simply returns an empty set 
 to the consumer. This can be inefficient, if you want to ensure low-latency 
 because you keep polling over and over. We should make a blocking version of 
 the fetch request so that the fetch request is not returned until the broker 
 has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-228) Reduce duplicate messages served by the kafka consumer for uncompressed topics to 0

2011-12-19 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13172441#comment-13172441
 ] 

Jun Rao commented on KAFKA-228:
---

Thanks for the patch. Some comments below:

1. This jira is a duplicate of kafka-198. We need to mark it.
2. ZookeeperConsumerConnector
2.1 commitOffsets(): the logging committing all offsets should be in trace or 
debug level
2.2 The patch commits all offsets for clearing every stream. This is 
unnecessary. We just need to commit all offsets once after we cleared all 
streams. Also, we don't need to clear all streams. We only need to clear a 
stream whose fetch queue needs to be cleared. Here is one way of doing that. We 
keep a reference to stream in each fetch queue (there is a 1:1 mapping btw 
stream and fetch queue). In closeFetchers(), we (1) stop the current fetcher; 
(2) for each fetch queue to be cleared: clear the queue and clear the 
corresponding stream; (3) commit all offsets. Then, we don't need 
Fetcher.clearFetcherQueues and we don't need to pass in kafkaMessageStreams in 
Fetcher.startConnections. Also, ConsumerIterator.clearCurrentChunk doesn't need 
to take any input parameters.

3. ConsumerIterator:
The logic is a bit complicated and I am not sure if it's really necessary. To 
me, it seems all we need is to make currentDataChunk an AtomticReference. 
clearCurrentChunk() simply sets the reference to null. This will be the only 
synchronization that we need (no need for lock). Because of the ordering in 
2.2, this will make sure the next hasNext() call on the stream blocks until the 
Fetcher is started again.


 Reduce duplicate messages served by the kafka consumer for uncompressed 
 topics to 0
 ---

 Key: KAFKA-228
 URL: https://issues.apache.org/jira/browse/KAFKA-228
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.7
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Fix For: 0.7.1

 Attachments: kafka-228.patch


 Kafka guarantees at-least once delivery of messages.The high level consumer 
 provides highly available partitioned consumption of data within the same 
 consumer group. In the event of broker failures or consumer failures within a 
 group, the high level consumer rebalances and redistributes the topic 
 partitions evenly amongst the consumers in a group. With the current design, 
 during this rebalancing operation, Kafka introduces duplicates in the 
 consumed data. 
 This JIRA improves the rebalancing operation and the consumer iterator design 
 to guarantee 0 duplicates while consuming uncompressed topics. There will be 
 a small number of duplicates while serving compressed data, but it will be 
 bound by the compression batch size.  

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-222) Mavenize contrib

2011-12-05 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13163235#comment-13163235
 ] 

Jun Rao commented on KAFKA-222:
---

+1

 Mavenize contrib
 

 Key: KAFKA-222
 URL: https://issues.apache.org/jira/browse/KAFKA-222
 Project: Kafka
  Issue Type: Task
  Components: contrib
Reporter: Neha Narkhede
Assignee: Neha Narkhede
Priority: Blocker
 Fix For: 0.7

 Attachments: kafka-222.patch, rm_jars.sh


 To reduce the overhead of maintaining the NOTICE and LICENSE files, we need 
 to mavenize most checked-in jars. Out of the current checked-in jars, the 
 following ones cannot be mavenized -
 1. lib/sbt-launch.jar: This is required to fire up the build system of Kafka 
 (SBT)
 2. core/lib/zkclient-20110412.jar: Kafka uses a patched zkclient jar
 3. contrib/hadoop-consumer/lib/piggybank.jar: This is not available through 
 Maven
 4. contrib/hadoop-consumer/lib/pig-0.8.0-core.jar: This is also not available 
 through Maven

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-212) IllegalThreadStateException in topic watcher for Kafka mirroring

2011-11-30 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13160491#comment-13160491
 ] 

Jun Rao commented on KAFKA-212:
---

We shouldn't be leaking threads. If we can get to the code that creates new 
MirrorThreads, the old threads should have finished since shutdown is blocking. 
If the shutdown blocks forever, we won't be able to create new threads. Again, 
there is no thread leak. Although the latter would suggest another serious bug 
somewhere else.

+1 on the patch.

 IllegalThreadStateException in topic watcher for Kafka mirroring
 

 Key: KAFKA-212
 URL: https://issues.apache.org/jira/browse/KAFKA-212
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Fix For: 0.7.1

 Attachments: KAFKA-212.patch


 If the kafka mirroring embedded consumer receives a new topic watcher 
 notification, it runs into the following exception 
 [2011-11-23 02:49:15,612] FATAL java.lang.IllegalThreadStateException 
 (kafka.consumer.ZookeeperTopicEventWatcher)
 [2011-11-23 02:49:15,612] FATAL java.lang.IllegalThreadStateException
 at java.lang.Thread.start(Thread.java:595)
 at 
 kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$3.apply(KafkaServerStartable.scala:142)
 at 
 kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$3.apply(KafkaServerStartable.scala:142)
 at 
 scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
 at scala.collection.immutable.List.foreach(List.scala:45)
 at 
 kafka.server.EmbeddedConsumer.startNewConsumerThreads(KafkaServerStartable.scala:142)
 at 
 kafka.server.EmbeddedConsumer.handleTopicEvent(KafkaServerStartable.scala:109)
 at 
 kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree2$1(ZookeeperTopicEventWatcher.scala:83)
 at 
 kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:78)
 at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
 at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
  (kafka.consumer.ZookeeperTopicEventWatcher)
 This happens since it tries to start a thread which has finished executing

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-210) javaapi ZookeeperConsumerConnectorTest duplicates many tests in the scala version

2011-11-23 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13155956#comment-13155956
 ] 

Jun Rao commented on KAFKA-210:
---

Yes, I am saying that there is no need to stress test the javaapi since it's 
just a wrapper. All we need to test is that the java - scala conversion 
works. So one basic test should be enough for this purpose.

 javaapi ZookeeperConsumerConnectorTest duplicates many tests in the scala 
 version
 -

 Key: KAFKA-210
 URL: https://issues.apache.org/jira/browse/KAFKA-210
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: Jun Rao
Assignee: Jun Rao
 Attachments: KAFKA-210.patch


 Since javaapi.ZookeeperConsumerConnector is just a thin wrapper over the 
 scala version, we only need to test the basic functionality there.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-204) BoundedByteBufferReceive hides OutOfMemoryError

2011-11-23 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13156061#comment-13156061
 ] 

Jun Rao commented on KAFKA-204:
---

Ok, I am fine with the patch then. Any objection to commit it?

 BoundedByteBufferReceive hides OutOfMemoryError
 ---

 Key: KAFKA-204
 URL: https://issues.apache.org/jira/browse/KAFKA-204
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.7
Reporter: Chris Burroughs
Assignee: Chris Burroughs
Priority: Critical
 Attachments: k204-v1.txt


   private def byteBufferAllocate(size: Int): ByteBuffer = {
 var buffer: ByteBuffer = null
 try {
   buffer = ByteBuffer.allocate(size)
 }
 catch {
   case e: OutOfMemoryError =
 throw new RuntimeException(OOME with size  + size, e)
   case e2 =
 throw e2
 }
 buffer
   }
 This hides the fact that an Error occurred, and will likely result in some 
 log handler printing a message, instead of exiting with non-zero status.  
 Knowing how large the allocation was that caused an OOM is really nice, so 
 I'd suggest logging in byteBufferAllocate and then re-throwing 
 OutOfMemoryError

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-101) Avoid creating a new topic by the consumer

2011-11-23 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13156377#comment-13156377
 ] 

Jun Rao commented on KAFKA-101:
---

Taylor, this would suggest a synchronization issue during append to the log. 
Looking at the code, the append is always synchronized on the lock of the 
segment file. Could you reproduce the problem on 0.7? 

 Avoid creating a new topic by the consumer
 --

 Key: KAFKA-101
 URL: https://issues.apache.org/jira/browse/KAFKA-101
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: Jun Rao
  Labels: patch
 Fix For: 0.7

 Attachments: 
 0002-KAFKA-101-Avoid-creating-a-new-topic-by-the-consumer.patch.0.7, 
 KAFKA-101-getoffsets.patch, KAFKA-101_v2.patch, KAFKA-101_v3.patch


 Currently, if  a consumer consumes a topic and the topic doesn't exist, the 
 topic is created automatically. Sometimes this can be confusing. Often, an ad 
 hoc user may put a wrong topic name in the consumer and thus create many 
 unnecessary topics.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.

2011-11-22 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13155240#comment-13155240
 ] 

Jun Rao commented on KAFKA-49:
--

Prshanth, thanks for getting started on this. I agree with your second 
approach. Basically, add a new parameter in SyncProducer.send/multisend to 
indicate whether an ack is needed or not. The high level producer can then set 
that parameter based on ProducerConfig (probably true for sync mode and false 
for async mode). 

Another question is what kind of ack does the broker send back. A simple 
approach is to send back a boolean. Another possibility is to return for each 
partition in the produce request, the latest offset after the request is 
served. Some clients could potentially make use of the returned offset.

 Add acknowledgement to the produce request.
 ---

 Key: KAFKA-49
 URL: https://issues.apache.org/jira/browse/KAFKA-49
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao

 Currently, the produce request doesn't get acknowledged. We need to have a 
 broker send a response to the producer and have the producer wait for the 
 response before sending the next request.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-211) Fix LICENSE file to include MIT and SCALA license

2011-11-21 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13154690#comment-13154690
 ] 

Jun Rao commented on KAFKA-211:
---

+1

 Fix LICENSE file to include MIT and SCALA license
 -

 Key: KAFKA-211
 URL: https://issues.apache.org/jira/browse/KAFKA-211
 Project: Kafka
  Issue Type: Task
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Fix For: 0.7

 Attachments: KAFKA-211.patch


 See here for reference - 
 http://markmail.org/search/?q=kafka+0.7.0#query:kafka%200.7.0%20list%3Aorg.apache.incubator.general+page:1+mid:fiatvda3uyx2lbzb+state:results
 Looks like the LICENSE file should include MIT and SCALA license too

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-209) Remove empty directory when no log segments remain

2011-11-20 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13153918#comment-13153918
 ] 

Jun Rao commented on KAFKA-209:
---

Actually, when all log segments are deleted, we automatically create an empty 
log file whose name is the latest offset. So the log directory will never be 
empty.

 Remove empty directory when no log segments remain
 --

 Key: KAFKA-209
 URL: https://issues.apache.org/jira/browse/KAFKA-209
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Taylor Gautier

 When the log cleaner runs it deletes segments from the log directory.  
 However, if all the segments are cleaned, the log directory itself is 
 retained.  There doesn't seem to be any need for this as the directory will 
 be re-created on write if necessary.
 This JIRA would improve the log cleaner to also remove unused topic 
 directories from the log directory.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-204) BoundedByteBufferReceive hides OutOfMemoryError

2011-11-20 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13153919#comment-13153919
 ] 

Jun Rao commented on KAFKA-204:
---

Hmm, it doesn't look like OutOfMemoryError allow one to specify a cause during 
initialization.

 BoundedByteBufferReceive hides OutOfMemoryError
 ---

 Key: KAFKA-204
 URL: https://issues.apache.org/jira/browse/KAFKA-204
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.7
Reporter: Chris Burroughs
Assignee: Chris Burroughs
Priority: Critical
 Attachments: k204-v1.txt


   private def byteBufferAllocate(size: Int): ByteBuffer = {
 var buffer: ByteBuffer = null
 try {
   buffer = ByteBuffer.allocate(size)
 }
 catch {
   case e: OutOfMemoryError =
 throw new RuntimeException(OOME with size  + size, e)
   case e2 =
 throw e2
 }
 buffer
   }
 This hides the fact that an Error occurred, and will likely result in some 
 log handler printing a message, instead of exiting with non-zero status.  
 Knowing how large the allocation was that caused an OOM is really nice, so 
 I'd suggest logging in byteBufferAllocate and then re-throwing 
 OutOfMemoryError

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-209) Remove empty directory when no log segments remain

2011-11-20 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13153987#comment-13153987
 ] 

Jun Rao commented on KAFKA-209:
---

It should behave that way in both 0.6 and 0.7. If not, it's a bug.

 Remove empty directory when no log segments remain
 --

 Key: KAFKA-209
 URL: https://issues.apache.org/jira/browse/KAFKA-209
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Taylor Gautier

 When the log cleaner runs it deletes segments from the log directory.  
 However, if all the segments are cleaned, the log directory itself is 
 retained.  There doesn't seem to be any need for this as the directory will 
 be re-created on write if necessary.
 This JIRA would improve the log cleaner to also remove unused topic 
 directories from the log directory.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-204) BoundedByteBufferReceive hides OutOfMemoryError

2011-11-16 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13151297#comment-13151297
 ] 

Jun Rao commented on KAFKA-204:
---

The main reason this was added is to show the requested size and the caller 
that triggered such a request. It would be nice if both pieces are logged 
together. With the new patch, those two pieces are logged separately (although 
should be close) and someone has to link them together manually.

 BoundedByteBufferReceive hides OutOfMemoryError
 ---

 Key: KAFKA-204
 URL: https://issues.apache.org/jira/browse/KAFKA-204
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.7
Reporter: Chris Burroughs
Assignee: Chris Burroughs
Priority: Critical
 Attachments: k204-v1.txt


   private def byteBufferAllocate(size: Int): ByteBuffer = {
 var buffer: ByteBuffer = null
 try {
   buffer = ByteBuffer.allocate(size)
 }
 catch {
   case e: OutOfMemoryError =
 throw new RuntimeException(OOME with size  + size, e)
   case e2 =
 throw e2
 }
 buffer
   }
 This hides the fact that an Error occurred, and will likely result in some 
 log handler printing a message, instead of exiting with non-zero status.  
 Knowing how large the allocation was that caused an OOM is really nice, so 
 I'd suggest logging in byteBufferAllocate and then re-throwing 
 OutOfMemoryError

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-101) Avoid creating a new topic by the consumer

2011-11-14 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13150246#comment-13150246
 ] 

Jun Rao commented on KAFKA-101:
---

Actually another thing. If a topic doesn't already exist, the consumer 
shouldn't create the ZK path /broker/topics/topic. Currently, we do the 
following in ZookeeperConsumerConnector.consume

  ZkUtils.makeSurePersistentPathExists(zkClient, partitionPath)

We should remove that line. There are a couple of things to check.
(1) If the consumer can still get the watcher triggered when the topic is 
created (by a producer). I think this should happen since ZKClient registers an 
Exists watcher if path doesn't exist. But we should check.
(2) There are probably places in the rebalance code where we should treat 
missing the ZK path  /broker/topics/topic properly.

 Avoid creating a new topic by the consumer
 --

 Key: KAFKA-101
 URL: https://issues.apache.org/jira/browse/KAFKA-101
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: Jun Rao
  Labels: patch
 Attachments: 
 0002-KAFKA-101-Avoid-creating-a-new-topic-by-the-consumer.patch.0.7


 Currently, if  a consumer consumes a topic and the topic doesn't exist, the 
 topic is created automatically. Sometimes this can be confusing. Often, an ad 
 hoc user may put a wrong topic name in the consumer and thus create many 
 unnecessary topics.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-50) kafka intra-cluster replication support

2011-11-13 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-50?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149400#comment-13149400
 ] 

Jun Rao commented on KAFKA-50:
--

The dependencies of the sub-jiras look like the following:

48
49
47 - 46 - 43
 44/4542
 41
  
This means that initially, 47,48,49 can be worked on independently.

 kafka intra-cluster replication support
 ---

 Key: KAFKA-50
 URL: https://issues.apache.org/jira/browse/KAFKA-50
 Project: Kafka
  Issue Type: New Feature
Reporter: Jun Rao
Assignee: Jun Rao
 Fix For: 0.8

 Attachments: kafka_replication_highlevel_design.pdf, 
 kafka_replication_lowlevel_design.pdf


 Currently, Kafka doesn't have replication. Each log segment is stored in a 
 single broker. This limits both the availability and the durability of Kafka. 
 If a broker goes down, all log segments stored on that broker become 
 unavailable to consumers. If a broker dies permanently (e.g., disk failure), 
 all unconsumed data on that node is lost forever. Our goal is to replicate 
 every log segment to multiple broker nodes to improve both the availability 
 and the durability. 
 We'd like to support the following in Kafka replication: 
 1. Configurable synchronous and asynchronous replication 
 2. Small unavailable window (e.g., less than 5 seconds) during broker 
 failures 
 3. Auto recovery when a failed broker rejoins 
 4. Balanced load when a broker fails (i.e., the load on the failed broker is 
 evenly spread among multiple surviving brokers)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-101) Avoid creating a new topic by the consumer

2011-11-13 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149414#comment-13149414
 ] 

Jun Rao commented on KAFKA-101:
---

Thanks for the patch.

The test failures seem to be transient and are the results of time 
dependencies, which we should fix separately.

Could you add a unit test on SimpleConsumer to consume a non-existing 
topic/partition? It should return an empty messageSet. Other than that, the 
patch looks good. 

 Avoid creating a new topic by the consumer
 --

 Key: KAFKA-101
 URL: https://issues.apache.org/jira/browse/KAFKA-101
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: Jun Rao
  Labels: patch
 Attachments: 
 0002-KAFKA-101-Avoid-creating-a-new-topic-by-the-consumer.patch.0.7


 Currently, if  a consumer consumes a topic and the topic doesn't exist, the 
 topic is created automatically. Sometimes this can be confusing. Often, an ad 
 hoc user may put a wrong topic name in the consumer and thus create many 
 unnecessary topics.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-201) Support for mirroring from multiple sources

2011-11-13 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149423#comment-13149423
 ] 

Jun Rao commented on KAFKA-201:
---

This should be relatively easy.  can just extend KafkaServerStartble to take an 
array of ConsumerConfig and create multiple instances of EmbeddedConsumer.

 Support for mirroring from multiple sources
 ---

 Key: KAFKA-201
 URL: https://issues.apache.org/jira/browse/KAFKA-201
 Project: Kafka
  Issue Type: New Feature
Reporter: Paul Querna

 Currently the EmbeddedConsumer is configured against a single source mirror.
 We have a use case of consuming from multiple sources clusters.
 Simple example, we have 3 datacenters which are collecting data: A, B, C.
 We want all 3 to get full copies of the data eventually, by using a  on a 
 whitelist of topics, and having the topics include the source data centers.
 So, we would like to be able to:
 Configure A to mirror B, and C with a whitelist of topics B,C
 Configure B to mirror A, and C with a whitelist of topics A,C
 Configure C to mirror A, and B with a whitelist of topics A,B

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-101) Avoid creating a new topic by the consumer

2011-11-13 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149426#comment-13149426
 ] 

Jun Rao commented on KAFKA-101:
---

Yes, since a test in SimpleConsumer will further test the wire protocol. Thanks,

 Avoid creating a new topic by the consumer
 --

 Key: KAFKA-101
 URL: https://issues.apache.org/jira/browse/KAFKA-101
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: Jun Rao
  Labels: patch
 Attachments: 
 0002-KAFKA-101-Avoid-creating-a-new-topic-by-the-consumer.patch.0.7


 Currently, if  a consumer consumes a topic and the topic doesn't exist, the 
 topic is created automatically. Sometimes this can be confusing. Often, an ad 
 hoc user may put a wrong topic name in the consumer and thus create many 
 unnecessary topics.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-101) Avoid creating a new topic by the consumer

2011-11-13 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149442#comment-13149442
 ] 

Jun Rao commented on KAFKA-101:
---

We can add it in PrimitiveApiTest.

 Avoid creating a new topic by the consumer
 --

 Key: KAFKA-101
 URL: https://issues.apache.org/jira/browse/KAFKA-101
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: Jun Rao
  Labels: patch
 Attachments: 
 0002-KAFKA-101-Avoid-creating-a-new-topic-by-the-consumer.patch.0.7


 Currently, if  a consumer consumes a topic and the topic doesn't exist, the 
 topic is created automatically. Sometimes this can be confusing. Often, an ad 
 hoc user may put a wrong topic name in the consumer and thus create many 
 unnecessary topics.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




  1   2   >