[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
[ https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497644#comment-14497644 ] Rekha Joshi commented on KAFKA-2098: Thanks for a prompt reply [~jkreps].What works for you works for me :-) But just for my curiosity, can you please share link to this Apache requirement? As Samza is also a top level Apache project and has https://github.com/apache/samza/tree/master/gradle/wrapper Thanks! Gradle Wrapper Jar gone missing in 0.8.2.1 -- Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi Assignee: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497616#comment-14497616 ] Jason Rosenberg commented on KAFKA-2125: So, I'd think the exception there before the 'NoSuchElementException' seems relevant (since it is complaining about the 1 partition that is then complained about repeatedly following Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker Attachments: grep_shut_edited.log I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly spamming the logs, like so: {code} 2015-04-14 05:56:11,281 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,281 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:11,582 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,582 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to
[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
[ https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497680#comment-14497680 ] Ewen Cheslack-Postava commented on KAFKA-2098: -- The original issue is here: https://issues.apache.org/jira/browse/KAFKA-1490 The first post of the mailing list thread linked to in that post gives a good rundown of the issues: http://mail-archives.apache.org/mod_mbox/incubator-general/201406.mbox/%3CCADiKvVs%3DtKDbp3TWRnxds5dVepqcX4kWeYbj7xUx%2BZoDNM_Lyg%40mail.gmail.com%3E Sadly, the deeper you dig, the more confusing things get. export vs checkout is hinted at here http://incubator.apache.org/guides/releasemanagement.html#best-practice-source which would indicate you can include binaries in the repo as long as they aren't included in the release. https://www.apache.org/dev/release-publishing.html#valid talks about all source code being appropriately licensed and under the right CLA, the latter potentially being a problem for an Apache source release since gradle wouldn't be covered by an Apache CLA. As a side note, downstream packagers sometimes dislike any binaries in source releases. See, e.g., https://www.debian.org/doc/packaging-manuals/java-policy/x84.html which indicates debian maintainers regularly need to repack source releases to remove the binary bits. Making sure they don't make them into source releases makes things a lot less painful for packagers. If you look into the source release for Samza, you'll see it doesn't contain any jars. They're stripped out during the release process (I haven't checked how, but just run find on the source tgz contents -- you won't find any jars). This matches the export vs. checkout distinction. It also addresses the potential CLA issues with a source release. I'm guessing that, given the complaints about the missing wrapper, this might be the right thing to do to appease Apache, downstream packagers, and developers/users. Personally, I'm surprised this causes as much frustration as it does -- the only time it's been an issue for me is on platforms where the native gradle packages are too old. But Gradle Wrapper Jar gone missing in 0.8.2.1 -- Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi Assignee: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.
[ https://issues.apache.org/jira/browse/KAFKA-2128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated KAFKA-2128: --- Status: Open (was: Patch Available) kafka.Kafka should return non-zero exit code when caught exception. --- Key: KAFKA-2128 URL: https://issues.apache.org/jira/browse/KAFKA-2128 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.3 Reporter: Sasaki Toru Priority: Minor Fix For: 0.8.3 kafka.Kafka Object always return exit code zero. I think that it should return non-zero exit code when caught exception. (for example FileNotFoundException caught, since server.properies is not exist) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.
[ https://issues.apache.org/jira/browse/KAFKA-2128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated KAFKA-2128: --- Status: Patch Available (was: Open) kafka.Kafka should return non-zero exit code when caught exception. --- Key: KAFKA-2128 URL: https://issues.apache.org/jira/browse/KAFKA-2128 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.3 Reporter: Sasaki Toru Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2128-1.patch kafka.Kafka Object always return exit code zero. I think that it should return non-zero exit code when caught exception. (for example FileNotFoundException caught, since server.properies is not exist) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hi Adi, 2. I assume you were saying than strictly needed for replications here? Also the concern I have is around error code: today if the replication is not finished within in the replication timeout then the error code will be set accordingly when it returns. Let's say if the produce request is not satisfied after X (replication timeout) ms, but is satisfied after Y (throttling timeout), should we still set the error code or not? I think it is OK to just set NO_ERROR but we need to document such cases clearly for quote actions mixed with ack = -1. Guozhang On Wed, Apr 15, 2015 at 4:23 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Thanks for the review Guozhang. 1. Agreed. 2. This proposal actually waits for the maximum of the 2 timeouts. This reduces implementation complexity at the cost of waiting longer than strictly needed for quotas. Note that this is only for the case where acks=-1. However we can solve this if it is a significant concern by adding watcher keys for all partitions (only if acks=-1). These are the keys we would normally add while waiting for acknowledgements. We can change the tryComplete() function to return false until 'quota_timeout' time has elapsed AND all the acknowledgements have been received. Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Wednesday, April 15, 2015 3:42 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for the summary. A few comments below: 1. Say a produce request has replication timeout X, and upon finishing the local append it is determined to be throttled Y ms where Y X, then after it has timed out in the purgatory after Y ms we should still check if the #.acks has fulfilled in order to set the correct error codes in the response. 2. I think it is actually common that the calculated throttle time Y is less than the replication timeout X, which will be a tricky case since we need to make sure 1) at least the request it held in the purgatory for Y ms, 2) after Y ms elapsed, if the #.acks has fulfilled within X ms then set no-error-code and return immediately, 3) after X ms elapsed, set timeout-error-code and return. Guozhang On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and return a response. The caveat here is that if the request is throttled but the throttle time is less than the max_wait timeout on the fetch request, the request will be delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than strictly necessary (since we are not watching for satisfaction on any keys). I added some testcases to DelayedOperationTest to verify that
Re: Review Request 33071: Dynamically load JRE-specific class in TestPurgatoryPerformance
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33071/#review80342 --- Cool! It looks good to me. - Yasuhiro Matsuda On April 10, 2015, 8:45 a.m., Rajini Sivaram wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33071/ --- (Updated April 10, 2015, 8:45 a.m.) Review request for kafka. Bugs: KAFKA-2113 https://issues.apache.org/jira/browse/KAFKA-2113 Repository: kafka Description --- Patch for KAFKA-2113: Dynamically load JRE-specific class in TestPurgatoryPerformance Diffs - core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala 962253a7260c90233d4a4c4fe8c75af211453f2a Diff: https://reviews.apache.org/r/33071/diff/ Testing --- Thanks, Rajini Sivaram
[jira] [Commented] (KAFKA-2121) error handling in KafkaProducer constructor
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498282#comment-14498282 ] Steven Zhen Wu commented on KAFKA-2121: --- Updated reviewboard https://reviews.apache.org/r/33242/diff/ against branch apache/trunk error handling in KafkaProducer constructor --- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2, KAFKA-2121_2015-04-16_09:55:14.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is probably the most intrusive change. I am willing to submit a patch. but like to hear your opinions on how we should fix the issue. Thanks, Steven
[jira] [Updated] (KAFKA-2121) error handling in KafkaProducer constructor
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated KAFKA-2121: -- Attachment: KAFKA-2121_2015-04-16_09:55:14.patch error handling in KafkaProducer constructor --- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2, KAFKA-2121_2015-04-16_09:55:14.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is probably the most intrusive change. I am willing to submit a patch. but like to hear your opinions on how we should fix the issue. Thanks, Steven -- Thanks, Ewen -- -- Guozhang -- This message was sent by Atlassian JIRA
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 16, 2015, 4:55 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- add a unit test file changes based on Ewen's review feedbacks Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 16, 2015, 5:03 p.m.) Review request for kafka. Changes --- address Ewen's review feedbacks I'm getting a bunch of checkstyle complaints when I try to test. These should all be easy to fix (and should be causing tests to fail before even running). The only rule that might not be obvious from the error message is that the static final field in MockMetricsReporter is expected to be all-caps since it looks like a constant to checkstyle. fixed In the constructor, could we throw some subclass of KafkaException instead? The new clients try to stick to that exception hierarchy except in a few special cases. Alternatively, maybe if we caught Error and RuntimeException instead of Throwable then we could just rethrow the same exception? I changed RuntimeException to KafkaException. can't think of a good subclass name for this scenario. ProducerConstructException? hence, stay with the generic KafkaException The new version of close() will swallow exceptions when called normally (i.e. not from the constructor). They'll be logged, but the caller won't see the exception anymore. Maybe we should save the first exception and rethrow it? refactored a private close(boolean swallowException) method Exception messages should be capitalized. fixed In the test, we should probably have an assert outside the catch. And is there any reason the closeCount is being reset to 0? yes. we should have an assert outside the catch I was just reset the CLOSE_COUNT in case another test method need to check the count. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- fix potential resource leak when KafkaProducer throws exception in the middle Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)
1. agreed 2. agree new error 3. having discrete operations for tasks makes sense, combining them is confusing for users I think. + 1 for let user change only one thing at a time 4. lets be consistent both to the new code and existing code. lets not confuse the user but give them the right error information so they know what they did wrong without much fuss. ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Wed, Apr 15, 2015 at 1:23 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, Thanks for the discussion! Summary: 1. Q: How KAFKA-1367 (isr is inconsistent in brokers' metadata cache) can affect implementation? A: We can fix this issue for the leading broker - ReplicaManager (or Partition) component should have accurate isr list, then with leading broker having correct info, to do a describe-topic we will need to define leading brokers for partitions and ask those for a correct isr list. Also, we should consider adding lag information to TMR for each follower for partition reassignment, as Jun suggested above. 2. Q: What if user adds different alter commands for the same topic in scope of one batch request? A: Because of the async nature of AlterTopicRequest it will be very hard then to assemble the expected (in terms of checking whether request is complete) result if we let users do this. Also it will be very confusing. It was proposed not to let users do this (probably add new Error for such cases). 3. Q: AlterTopicRequest semantics: now when we merged AlterTopic and ReassingPartitons in which order AlterTopic fields should be resolved? A: This item is not clear. There was a proposal to let user change only one thing at a time, e.g. specify either new Replicas, or ReplicaAssignment. This can be a simple solution, but it's a very strict rule. E.g. currently with TopicCommand user can increase nr of partitions and define replica assignment for newly added partitions. Taking into account item 2. this will be even harder for user to achieve this. 4. Q: Do we need such accurate errors returned from the server: InvalidArgumentPartitions, InvalidArgumentReplicas etc. A: I started implementation to add proposed error codes and now I think probably InvalidArgumentError should be sufficient. We can do simple validations on the client side (e.g. AdminClient can ensure nr of partitions argument is positive), others - which can be covered only on server (probably invalid topic config, replica assignment includes dead broker etc) - will be done on server, and in case of invalid argument we will return InvalidArgumentError without specifying the concrete field. It'd be great if we could cover these remaining issues, looks like they are minor, at least related to specific messages, not the overall protocol. - I think with that I can update confluence page and update patch to reflect all discussed items. This patch will probably include Wire protocol messages and server-side code to handle new requests. AdminClient and cli-tool implementation can be the next step. Thanks, Andrii Biletskyi On Wed, Apr 15, 2015 at 7:26 PM, Jun Rao j...@confluent.io wrote: Andrii, 500. I think what you suggested also sounds reasonable. Since ISR is only maintained accurately at the leader, TMR can return ISR if the broker is the leader of a partition. Otherwise, we can return an empty ISR. For partition reassignment, it would be useful to know the lag of each follower. Again, the leader knows this info. We can probably include that info in TMR as well. 300. I think it's probably reasonable to restrict AlterTopicRequest to change only one thing at a time, i.e., either partitions, replicas, replica assignment or config. Thanks, Jun On Mon, Apr 13, 2015 at 10:56 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jun, 404. Great, thanks! 500. If I understand correctly KAFKA-1367 says ISR part of TMR may be inconsistent. If so, then I believe all admin commands but describeTopic are not affected. Let me emphasize that it's about AdminClient operations, not about Wire Protocol requests. What I mean: To verify AdminClient.createTopic we will need (consistent) 'topics' set from TMR (we don't need isr) To verify alterTopic - again, probably 'topics' and 'assigned replicas' + configs To verify deleteTopic - only 'topics' To verify preferredReplica - 'leader', 'assigned replicas' To verify reassignPartitions - 'assigned replicas' ? (I'm not sure about this one) If everything above is correct, then AdminClient.describeTopic is the only command under risk. We can actually workaround it - find out
[DISCUSSION] KIP-11: ACL Management
Hi Kafka Authorization Fans, I'm starting a new thread on a specific sub-topic of KIP-11, since this is a bit long :) Currently KIP-11, as I understand it, proposes: * Authorizers are pluggable, with Kafka providing DefaultAuthorizer. * Kafka tools allow adding / managing ACLs. * Those ACLs are stored in ZK and cached in a new TopicCache * Authorizers can either use the ACLs defined and stored in Kafka, or define and use their own. I am concerned of two possible issues with this design: 1. Separation of concerns - only authorizers should worry about ACLs, and therefore the less code for ACLs that exist in Kafka core, the better. 2. User confusion - It sounded like we can define ACLs in Kafka itself but authorizers can also define their own, so kafka-topics --describe may show an ACL different than the one in use. This can be super confusing for admins. My alternative suggestion: * Authorizer API will include: grantPrivilege(ListPrincipals, ListPrivilege) revokePrivilege(ListPrincipals, ListPrivilege), getPrivilegesByPrincipal(Principal, Resource) (The exact API can be discussed in detail, but you get the idea) * Kafka tools will simply invoke these APIs when topics are added / modified / described. * Each authorizer (including the default one) will be responsible for storing, caching and using those ACLs. This way, we keep almost all ACL code with the Authorizer, where it belongs and users get a nice unified interface that reflects what is actually getting used in the system. This is pretty much how Sqoop and Hive implement their authorization APIs. What do you think? Gwen
[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
[ https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498307#comment-14498307 ] alexcb commented on KAFKA-2098: --- Thanks for the notes everyone. gradlew (the gradle wrapper) is designed to allow users who do not have gradle setup to build the project. If you require gradle to be setup ahead of time, then it seems like one could just use gradle directly. For example: {noformat} [root@feb6842638b9 kafkatest]# tar xzf kafka-0.8.2.1-src.tgz [root@feb6842638b9 kafkatest]# cd kafka-0.8.2.1-src [root@feb6842638b9 kafka-0.8.2.1-src]# rm gradlew gradlew.bat *** me simulating removing these files from the release [root@feb6842638b9 kafka-0.8.2.1-src]# gradle jar ... output removed... :core:processResources UP-TO-DATE :core:classes :core:copyDependantLibs :core:jar :examples:compileJava :examples:processResources UP-TO-DATE :examples:classes :examples:jar :contrib:hadoop-consumer:compileJava Note: Some input files use or override a deprecated API. Note: Recompile with -Xlint:deprecation for details. Note: Some input files use unchecked or unsafe operations. Note: Recompile with -Xlint:unchecked for details. :contrib:hadoop-consumer:processResources UP-TO-DATE :contrib:hadoop-consumer:classes :contrib:hadoop-consumer:jar :contrib:hadoop-producer:compileJava :contrib:hadoop-producer:processResources UP-TO-DATE :contrib:hadoop-producer:classes :contrib:hadoop-producer:jar BUILD SUCCESSFUL {noformat} By using this directly, we 1) avoid the confusion of distributing a non-working gradle wrapper, and 2) bypass the wrapper and just use gradle directly. Thanks. Gradle Wrapper Jar gone missing in 0.8.2.1 -- Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi Assignee: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.
[ https://issues.apache.org/jira/browse/KAFKA-2128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated KAFKA-2128: --- Attachment: KAFKA-2128-1.patch Upload patch to improve this problem. kafka.Kafka should return non-zero exit code when caught exception. --- Key: KAFKA-2128 URL: https://issues.apache.org/jira/browse/KAFKA-2128 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.3 Reporter: Sasaki Toru Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2128-1.patch kafka.Kafka Object always return exit code zero. I think that it should return non-zero exit code when caught exception. (for example FileNotFoundException caught, since server.properies is not exist) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.
Sasaki Toru created KAFKA-2128: -- Summary: kafka.Kafka should return non-zero exit code when caught exception. Key: KAFKA-2128 URL: https://issues.apache.org/jira/browse/KAFKA-2128 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.3 Reporter: Sasaki Toru Priority: Minor Fix For: 0.8.3 kafka.Kafka Object always return exit code zero. I think that it should return non-zero exit code when caught exception. (for example FileNotFoundException caught, since server.properies is not exist) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.
[ https://issues.apache.org/jira/browse/KAFKA-2128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated KAFKA-2128: --- Status: Patch Available (was: Open) kafka.Kafka should return non-zero exit code when caught exception. --- Key: KAFKA-2128 URL: https://issues.apache.org/jira/browse/KAFKA-2128 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.3 Reporter: Sasaki Toru Priority: Minor Fix For: 0.8.3 kafka.Kafka Object always return exit code zero. I think that it should return non-zero exit code when caught exception. (for example FileNotFoundException caught, since server.properies is not exist) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 16, 2015, 5:44 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- add a unit test file changes based on Ewen's review feedbacks fix capitalization in error log Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even
[jira] [Updated] (KAFKA-2113) TestPurgatoryPerformance does not compile using IBM JDK
[ https://issues.apache.org/jira/browse/KAFKA-2113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2113: --- Resolution: Fixed Fix Version/s: 0.8.3 Status: Resolved (was: Patch Available) Thanks for the patch. +1 and committed to trunk. TestPurgatoryPerformance does not compile using IBM JDK --- Key: KAFKA-2113 URL: https://issues.apache.org/jira/browse/KAFKA-2113 Project: Kafka Issue Type: Bug Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 0.8.3 Attachments: KAFKA-2113.patch TestPurgatoryPerformance uses a class from the com.sun package that is not available in the IBM JDK. The code does handle the class not found exception at runtime if run with an IBM JRE (and prints out -1 as CPU time). But as a result of the direct reference to the com.sun.management.OperatingSystemMXBean class, Kafka core project no longer compiles with an IBM JDK. {quote} :core:compileTestScala/kafka/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala:88: type OperatingSystemMXBean is not a member of package com.sun.management Some(ManagementFactory.getOperatingSystemMXBean().asInstanceOf[com.sun.management.OperatingSystemMXBean]) ^ one error found FAILED {quote} The JRE-specific class should be dynamically loaded to enable the class to be compiled as well as run with any JDK. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498340#comment-14498340 ] Suresh Srinivas commented on KAFKA-2120: KAFKA-1788 which has been in the work for some time now addresses this, right? Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/#review80346 --- Looks good, left a few comments. KafkaConsumer suffers from this same problem. Patching that should be pretty much identical -- any chance you could extend this to cover that as well? clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/33242/#comment130187 This code is all single threaded, is the AtomicReference really necessary here? clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/33242/#comment130194 Minor, but these messages should be cleaned up -- just needs some capitalization. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/33242/#comment130195 One idea for making this less verbose and redundant: make all of these classes implement Closeable so we can just write one utility method for trying to close something and catching the exception. - Ewen Cheslack-Postava On April 16, 2015, 5:03 p.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 16, 2015, 5:03 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- fix potential resource leak when KafkaProducer throws exception in the middle Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
[jira] [Updated] (KAFKA-2121) error handling in KafkaProducer constructor
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated KAFKA-2121: -- Attachment: KAFKA-2121_2015-04-16_10:43:55.patch error handling in KafkaProducer constructor --- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is probably the most intrusive change. I am willing to submit a patch. but like to hear your opinions on how we should fix the issue. Thanks, Steven -- Thanks, Ewen -- -- Guozhang --
Re: Review Request 33242: Patch for KAFKA-2121
On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 531 https://reviews.apache.org/r/33242/diff/2/?file=931792#file931792line531 This code is all single threaded, is the AtomicReference really necessary here? not really necessary. just trying to use the compareAndSet. otherwise, I need to do if(firstException == null) firstException = t. I can certainly change it. let me know. On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 548 https://reviews.apache.org/r/33242/diff/2/?file=931792#file931792line548 One idea for making this less verbose and redundant: make all of these classes implement Closeable so we can just write one utility method for trying to close something and catching the exception. yes. I thought about it. it may break binary compatibility, e.g. Serializer. Sender and Metrics classes are probably only used internally. let me know your thoughts. - Steven --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/#review80346 --- On April 16, 2015, 5:44 p.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 16, 2015, 5:44 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- add a unit test file changes based on Ewen's review feedbacks fix capitalization in error log Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
[jira] [Commented] (KAFKA-2121) error handling in KafkaProducer constructor
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498371#comment-14498371 ] Steven Zhen Wu commented on KAFKA-2121: --- Updated reviewboard https://reviews.apache.org/r/33242/diff/ against branch apache/trunk error handling in KafkaProducer constructor --- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is probably the most intrusive change. I am willing to submit a patch. but like to hear your opinions on how we should fix
Re: [DISCUSS] New consumer offset commit API
Hey Ewen, This makes sense. People usually do not want to stop consuming when committing offsets. One corner case about async commit with retries I am thinking is that it is possible that two offset commits interleave with each other and that might create problem. Like you said maybe we can cancel the previous one. Another thing is that whether the future mechanism will only be applied to auto commit or it will also be used in manual commit? Because in new consumer we allow user to provide an offset map for offset commit. Simply canceling a previous pending offset commit does not seem to be ideal in this case because the two commits could be for different partitions. Thanks. Jiangjie (Becket) Qin On 4/14/15, 4:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote: I'd like to get some feedback on changing the offset commit API in the new consumer. Since this is user-facing API I wanted to make sure this gets better visibility than the JIRA ( https://issues.apache.org/jira/browse/KAFKA-2123) might. The motivation is to make it possible to do async commits but be able to tell when the commit completes/fails. I'm suggesting changing the API from void commit(Map offsets, CommitType) to FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); which matches the approach used for the producer. The ConsumerCommitCallback only has one method: public void onCompletion(Exception exception); This enables a few different use cases: * Blocking commit via Future.get(), and blocking with timeouts via Future.get(long, TimeUnit) * See exceptions via the future (see discussion of retries below) * Callback-based notification so you can keep processing messages and only take action if something goes wrong, takes too long, etc. This is the use case that motivated * Fire and forget commits via a shorthand commit() API and ignoring the resulting future. One big difference between this and the producer API is that there isn't any result (except maybe an exception) from commitOffsets. This leads to the somewhat awkward FutureVoid signature. I personally prefer that to the sync/async flag, especially since it also provides a non-blocking interface for checking whether the commit is complete. I posted a WIP patch to the JIRA. In the progress of making it I found a few issues that might be worth discussing: 1. Retries. In the old approach, this was trivial since it only applied to synchronous calls, so we could just loop until the request was successful. Do we want to start introducing a retries mechanism here, and should it apply to all types of requests or are we going to end up with a couple of different retry settings for specific cases, like offset commit. The WIP patch allows errors to bubble up through the Future on the first failure, which right now can cause some tests to fail transiently (e.g. consumer bounce test). I think some sort of retry mechanism, even if it's an internal constant rather than configurable, is probably the right solution, but I want to figure out how broadly they should apply. I think adding them only for offset commits isn't hard. 2. The Future implementation is a bit weird because the consumer doesn't have a dedicated IO thread. My only concern is that this could lead to some unintuitive results based on the current implementation because the way this works is to just run poll() in the thread calling Future.get(), but it mutes all non-coordinator nodes which means other processing is effectively paused. If you're processing offset commits in a separate thread from your main consumer thread that's calling poll(), you might just end up bocking the main thread while waiting on the Future. Then again, I'm not sure the other nodes really even need to be muted -- maybe Jay or Guozhang have ideas on this? 3. Should the future be cancellable? This probably isn't hard to implement, but I'm not sure we should even bother. On the one hand it could be nice, especially if you have an old commit request that you want to superseded by a new one with updated offsets. On the other hand, if the request has already been sent out, cancelling it won't accomplish anything. I think the only case this is useful is when there are retries. Thoughts? -- Thanks, Ewen
[jira] [Updated] (KAFKA-2121) error handling in KafkaProducer constructor
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated KAFKA-2121: -- Attachment: (was: KAFKA-2121.patch.2) error handling in KafkaProducer constructor --- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is probably the most intrusive change. I am willing to submit a patch. but like to hear your opinions on how we should fix the issue. Thanks, Steven -- Thanks, Ewen -- -- Guozhang -- This message was sent by
[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
[ https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498750#comment-14498750 ] alexcb commented on KAFKA-2098: --- Perhaps I was unclear in what I was suggesting. Since [~gwenshap] mentioned that we don't want jars in the release, which I completely agree with, then we should also remove the auto generated gradlew and gradlew.bat scripts. When gradlew is run, it hardcodes the CLASSPATH to ./gradle/wrapper/gradle-wrapper.jar; and I don't see any mechanism in the script to include a user defined CLASSPATH. I fail to see how it's even possible to run gradlew in it's current state without running gradle first. Can you elaborate on how this is possible to use the gradlew script without running gradle first? i.e. how does it run if ./gradle/wrapper/gradle-wrapper.jar does not exist. Gradle Wrapper Jar gone missing in 0.8.2.1 -- Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi Assignee: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
[ https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498774#comment-14498774 ] Ewen Cheslack-Postava commented on KAFKA-2098: -- Sorry @alexcb, I think we were just talking past each other. Yes, I agree that having the wrapper scripts is pointless without the jars since you need gradle anyway. I was just pointing out that even if you require another gradle installation to bootstrap, we might still want to rely on the wrapper. For example, I'm not sure we want to change the README to change all the gradlew commands to use gradle directly like you demonstrated above. Using an existing gradle installation only to bootstrap the wrapper and then using the wrapper for everything can remove a bunch of compatibility issues. Gradle Wrapper Jar gone missing in 0.8.2.1 -- Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi Assignee: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
RE: [KIP-DISCUSSION] KIP-13 Quotas
Hey Guozhang, I don't think we should return an error if the request is satisfied after Y (throttling timeout) because it may cause the producer to think that the request was not ack'ed at all. Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Thursday, April 16, 2015 9:06 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hi Adi, 2. I assume you were saying than strictly needed for replications here? Also the concern I have is around error code: today if the replication is not finished within in the replication timeout then the error code will be set accordingly when it returns. Let's say if the produce request is not satisfied after X (replication timeout) ms, but is satisfied after Y (throttling timeout), should we still set the error code or not? I think it is OK to just set NO_ERROR but we need to document such cases clearly for quote actions mixed with ack = -1. Guozhang On Wed, Apr 15, 2015 at 4:23 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Thanks for the review Guozhang. 1. Agreed. 2. This proposal actually waits for the maximum of the 2 timeouts. This reduces implementation complexity at the cost of waiting longer than strictly needed for quotas. Note that this is only for the case where acks=-1. However we can solve this if it is a significant concern by adding watcher keys for all partitions (only if acks=-1). These are the keys we would normally add while waiting for acknowledgements. We can change the tryComplete() function to return false until 'quota_timeout' time has elapsed AND all the acknowledgements have been received. Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Wednesday, April 15, 2015 3:42 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for the summary. A few comments below: 1. Say a produce request has replication timeout X, and upon finishing the local append it is determined to be throttled Y ms where Y X, then after it has timed out in the purgatory after Y ms we should still check if the #.acks has fulfilled in order to set the correct error codes in the response. 2. I think it is actually common that the calculated throttle time Y is less than the replication timeout X, which will be a tricky case since we need to make sure 1) at least the request it held in the purgatory for Y ms, 2) after Y ms elapsed, if the #.acks has fulfilled within X ms then set no-error-code and return immediately, 3) after X ms elapsed, set timeout-error-code and return. Guozhang On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and
Re: [KIP-DISCUSSION] KIP-13 Quotas
The quota check for the fetch request is a bit different from the produce request. I assume that for the fetch request, we will first get an estimated fetch response size to do the quota check. There are two things to think about. First, when we actually send the response, we probably don't want to record the metric again since it will double count. Second, the bytes that the fetch response actually sends could be more than the estimate. This means that the metric may not be 100% accurate. We may be able to limit the fetch size of each partition to what's in the original estimate. For the produce request, I was thinking that another way to do this is to first figure out the quota_timeout. Then wait in Purgatory for quota_timeout with no key. If the request is not satisfied in quota_timeout and (request_timeout quota_timeout), wait in Purgatory for (request_timeout - quota_timeout) with the original keys. Thanks, Jun On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and return a response. The caveat here is that if the request is throttled but the throttle time is less than the max_wait timeout on the fetch request, the request will be delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than strictly necessary (since we are not watching for satisfaction on any keys). I added some testcases to DelayedOperationTest to verify that it is possible to schedule operations with no watcher keys. By inserting elements with no watch keys, the purgatory simply becomes a delay queue. It may also make sense to add a new API to the purgatory called delayFor() that basically accepts an operation without any watch keys (Thanks for the suggestion Joel). Thoughts? Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Monday, April 13, 2015 7:27 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas I think KAFKA-2063 (bounding fetch response) is still under discussion, and may not be got it in time with KAFKA-1927. On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: I think it's reasonable to batch the protocol changes together. In addition to the protocol changes, is someone actively driving the server side changes/KIP process for KAFKA-2063? Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Thursday, April 09, 2015 8:59 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Since we are also thinking about evolving the fetch request protocol in KAFKA-2063 (bound fetch response size), perhaps it's worth thinking through if we can just evolve the protocol once. Thanks,
Re: Review Request 33242: Patch for KAFKA-2121
On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote: Looks good, left a few comments. KafkaConsumer suffers from this same problem. Patching that should be pretty much identical -- any chance you could extend this to cover that as well? sure. I can extend this to KafkaConsumer later. - Steven --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/#review80346 --- On April 16, 2015, 5:44 p.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 16, 2015, 5:44 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- add a unit test file changes based on Ewen's review feedbacks fix capitalization in error log Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hi Guozhang, By implicit timeout for close() and flush(), I meant that currently we don’t have a explicit timeout for close() or flush() when a broker is down, so they can take pretty long up to TCP timeout which is hours as you mentioned. With the client side request timeout, the waiting time would be sort of bounded by request timeout. And I agree we’d better change the TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG to avoid confusion. Thanks. Jiangjie (Becket) Qin On 4/15/15, 10:38 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even needs to be a user-facing setting, especially if we have to add other timeouts anyway. For example, would a fixed, generous value that's still much shorter than a TCP timeout, say 15s, be good enough? If other timeouts would allow, for example, the clients to properly exit even if requests have not hit their timeout, then what's the benefit of being able to configure the request-level timeout? That is a very good point. We have three
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)
1. For the lags, we can add a new field lags per partition. It will return for each replica that's not in isr, the replica id and the lag in messages. Also, if TMR is sent to a non-leader, the response can just include an empty array for isr and lags. 2. So, we will just return a topic level error for the duplicated topics, right? 3. Yes, it's true that today, one can specify both partitions and replicaAssignment in the TopicCommand. However, partitions is actually ignored. So, it will be clearer if we don't allow users to do this. 4. How many specific error codes like InvalidPartitions and InvalidReplicas are needed? If it's not that many, giving out more specific error will be useful for non-java clients. Thanks, Jun On Wed, Apr 15, 2015 at 10:23 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, Thanks for the discussion! Summary: 1. Q: How KAFKA-1367 (isr is inconsistent in brokers' metadata cache) can affect implementation? A: We can fix this issue for the leading broker - ReplicaManager (or Partition) component should have accurate isr list, then with leading broker having correct info, to do a describe-topic we will need to define leading brokers for partitions and ask those for a correct isr list. Also, we should consider adding lag information to TMR for each follower for partition reassignment, as Jun suggested above. 2. Q: What if user adds different alter commands for the same topic in scope of one batch request? A: Because of the async nature of AlterTopicRequest it will be very hard then to assemble the expected (in terms of checking whether request is complete) result if we let users do this. Also it will be very confusing. It was proposed not to let users do this (probably add new Error for such cases). 3. Q: AlterTopicRequest semantics: now when we merged AlterTopic and ReassingPartitons in which order AlterTopic fields should be resolved? A: This item is not clear. There was a proposal to let user change only one thing at a time, e.g. specify either new Replicas, or ReplicaAssignment. This can be a simple solution, but it's a very strict rule. E.g. currently with TopicCommand user can increase nr of partitions and define replica assignment for newly added partitions. Taking into account item 2. this will be even harder for user to achieve this. 4. Q: Do we need such accurate errors returned from the server: InvalidArgumentPartitions, InvalidArgumentReplicas etc. A: I started implementation to add proposed error codes and now I think probably InvalidArgumentError should be sufficient. We can do simple validations on the client side (e.g. AdminClient can ensure nr of partitions argument is positive), others - which can be covered only on server (probably invalid topic config, replica assignment includes dead broker etc) - will be done on server, and in case of invalid argument we will return InvalidArgumentError without specifying the concrete field. It'd be great if we could cover these remaining issues, looks like they are minor, at least related to specific messages, not the overall protocol. - I think with that I can update confluence page and update patch to reflect all discussed items. This patch will probably include Wire protocol messages and server-side code to handle new requests. AdminClient and cli-tool implementation can be the next step. Thanks, Andrii Biletskyi On Wed, Apr 15, 2015 at 7:26 PM, Jun Rao j...@confluent.io wrote: Andrii, 500. I think what you suggested also sounds reasonable. Since ISR is only maintained accurately at the leader, TMR can return ISR if the broker is the leader of a partition. Otherwise, we can return an empty ISR. For partition reassignment, it would be useful to know the lag of each follower. Again, the leader knows this info. We can probably include that info in TMR as well. 300. I think it's probably reasonable to restrict AlterTopicRequest to change only one thing at a time, i.e., either partitions, replicas, replica assignment or config. Thanks, Jun On Mon, Apr 13, 2015 at 10:56 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jun, 404. Great, thanks! 500. If I understand correctly KAFKA-1367 says ISR part of TMR may be inconsistent. If so, then I believe all admin commands but describeTopic are not affected. Let me emphasize that it's about AdminClient operations, not about Wire Protocol requests. What I mean: To verify AdminClient.createTopic we will need (consistent) 'topics' set from TMR (we don't need isr) To verify alterTopic - again, probably 'topics' and 'assigned replicas' + configs To verify deleteTopic - only 'topics' To verify
Re: Review Request 33239: Patch for KAFKA-2126
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33239/#review80394 --- Ship it! Ship It! - Jiangjie Qin On April 15, 2015, 8:19 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33239/ --- (Updated April 15, 2015, 8:19 p.m.) Review request for kafka. Bugs: KAFKA-2126 https://issues.apache.org/jira/browse/KAFKA-2126 Repository: kafka Description --- KAFKA-2126: Configure automatically instantiated deserializers in new consumer. Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 Diff: https://reviews.apache.org/r/33239/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: [DISCUSSION] KIP-11: ACL Management
Hi, Gwen, What you suggested seems reasonable. I guess we will need the Principal, Privilege pair and the Resource in grant() and revoke()? Is the Hive authorization api the following? It's weird that it takes user in checkPermissions(), but not in authorize(). http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.html I was imagining that a default implementation could be similar to how we store offsets in Kafka. Basically, store all acls in a special topic with compact retention. Then, every broker will build an in-memory cache off that topic. Another thing that we haven't discussed so far is how to manage ACLs across different mirrored Kafka clusters. Let's say you use mirror maker to mirror all topics from cluster A to cluster B. You probably want to have exactly the same ACL on both A and B. It would be good if the ACL can be set up just once. If we use the above default implementation, since the ACL topic is mirrored too, the ACL will be propagated automatically. Thanks, Jun On Thu, Apr 16, 2015 at 9:44 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Authorization Fans, I'm starting a new thread on a specific sub-topic of KIP-11, since this is a bit long :) Currently KIP-11, as I understand it, proposes: * Authorizers are pluggable, with Kafka providing DefaultAuthorizer. * Kafka tools allow adding / managing ACLs. * Those ACLs are stored in ZK and cached in a new TopicCache * Authorizers can either use the ACLs defined and stored in Kafka, or define and use their own. I am concerned of two possible issues with this design: 1. Separation of concerns - only authorizers should worry about ACLs, and therefore the less code for ACLs that exist in Kafka core, the better. 2. User confusion - It sounded like we can define ACLs in Kafka itself but authorizers can also define their own, so kafka-topics --describe may show an ACL different than the one in use. This can be super confusing for admins. My alternative suggestion: * Authorizer API will include: grantPrivilege(ListPrincipals, ListPrivilege) revokePrivilege(ListPrincipals, ListPrivilege), getPrivilegesByPrincipal(Principal, Resource) (The exact API can be discussed in detail, but you get the idea) * Kafka tools will simply invoke these APIs when topics are added / modified / described. * Each authorizer (including the default one) will be responsible for storing, caching and using those ACLs. This way, we keep almost all ACL code with the Authorizer, where it belongs and users get a nice unified interface that reflects what is actually getting used in the system. This is pretty much how Sqoop and Hive implement their authorization APIs. What do you think? Gwen
[jira] [Created] (KAFKA-2129) Consumer could make multiple concurrent metadata requests
Tim Brooks created KAFKA-2129: - Summary: Consumer could make multiple concurrent metadata requests Key: KAFKA-2129 URL: https://issues.apache.org/jira/browse/KAFKA-2129 Project: Kafka Issue Type: Bug Components: clients Reporter: Tim Brooks Priority: Minor The NetworkClient's metadataFetchInProgress is neither volatile nor atomic. This protects against multiple metadata requests being made and is read on poll() on the NetworkClient. It is written to when a request is initiated. This is fine for the producer. Which seems to have one thread writing. The KafkaConsumer's poll() method is synchronized, so there will not be more than one writer entering from there. However, the NetworkClient's poll() method is also accessed on the Consumer's partitionsFor() method. Which could be access by a separate thread. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33242: Patch for KAFKA-2121
On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 548 https://reviews.apache.org/r/33242/diff/2/?file=931792#file931792line548 One idea for making this less verbose and redundant: make all of these classes implement Closeable so we can just write one utility method for trying to close something and catching the exception. Steven Wu wrote: yes. I thought about it. it may break binary compatibility, e.g. Serializer. Sender and Metrics classes are probably only used internally. let me know your thoughts. I'm pretty sure it's fine, based on this Changing the direct superclass or the set of direct superinterfaces of a class type will not break compatibility with pre-existing binaries, provided that the total set of superclasses or superinterfaces, respectively, of the class type loses no members. from https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/#review80346 --- On April 16, 2015, 5:44 p.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 16, 2015, 5:44 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- add a unit test file changes based on Ewen's review feedbacks fix capitalization in error log Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
[ https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498431#comment-14498431 ] Ewen Cheslack-Postava commented on KAFKA-2098: -- [~alexcb] That works as long as you have a version of gradle that works with the project. The other thing the wrapper does is let the project specify the version of gradle to build with, which makes compatibility simpler -- if you have an older system-installed gradle, the wrapper lets you use a newer version just for that specific project. This makes builds more robust since they don't have to support every version of gradle that people might have installed. Unfortunately the gradlew bootstrapping doesn't work with all versions, which is why Gradle recommends checking in the scripts and jars. For example, setting up Kafka for development on Ubuntu Trusty is kind of a pain right now because the version of Gradle (1.4) is too old to get past the bootstrapping phase, so you still need to download Gradle separately just to bootstrap the wrapper scripts. Gradle Wrapper Jar gone missing in 0.8.2.1 -- Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi Assignee: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Build failed in Jenkins: Kafka-trunk #464
See https://builds.apache.org/job/Kafka-trunk/464/changes Changes: [junrao] kafka-2113; TestPurgatoryPerformance does not compile using IBM JDK; patched by Rajini Sivaram; reviewed by Yasuhiro Matsuda and Jun Rao -- [...truncated 1245 lines...] kafka.log.LogTest testParseTopicPartitionNameForEmptyName PASSED kafka.log.LogTest testParseTopicPartitionNameForNull PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingSeparator PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingTopic PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingPartition PASSED kafka.log.LogSegmentTest testReadOnEmptySegment PASSED kafka.log.LogSegmentTest testReadBeforeFirstOffset PASSED kafka.log.LogSegmentTest testMaxOffset PASSED kafka.log.LogSegmentTest testReadAfterLast PASSED kafka.log.LogSegmentTest testReadFromGap PASSED kafka.log.LogSegmentTest testTruncate PASSED kafka.log.LogSegmentTest testTruncateFull PASSED kafka.log.LogSegmentTest testNextOffsetCalculation PASSED kafka.log.LogSegmentTest testChangeFileSuffixes PASSED kafka.log.LogSegmentTest testRecoveryFixesCorruptIndex PASSED kafka.log.LogSegmentTest testRecoveryWithCorruptMessage PASSED kafka.log.LogConfigTest testFromPropsDefaults PASSED kafka.log.LogConfigTest testFromPropsEmpty PASSED kafka.log.LogConfigTest testFromPropsToProps PASSED kafka.log.LogConfigTest testFromPropsInvalid PASSED kafka.log.CleanerTest testCleanSegments PASSED kafka.log.CleanerTest testCleaningWithDeletes PASSED kafka.log.CleanerTest testCleaningWithUnkeyedMessages PASSED kafka.log.CleanerTest testCleanSegmentsWithAbort PASSED kafka.log.CleanerTest testSegmentGrouping PASSED kafka.log.CleanerTest testSegmentGroupingWithSparseOffsets PASSED kafka.log.CleanerTest testBuildOffsetMap PASSED kafka.log.FileMessageSetTest testWrittenEqualsRead PASSED kafka.log.FileMessageSetTest testIteratorIsConsistent PASSED kafka.log.FileMessageSetTest testSizeInBytes PASSED kafka.log.FileMessageSetTest testWriteTo PASSED kafka.log.FileMessageSetTest testTruncate PASSED kafka.log.FileMessageSetTest testFileSize PASSED kafka.log.FileMessageSetTest testIterationOverPartialAndTruncation PASSED kafka.log.FileMessageSetTest testIterationDoesntChangePosition PASSED kafka.log.FileMessageSetTest testRead PASSED kafka.log.FileMessageSetTest testSearch PASSED kafka.log.FileMessageSetTest testIteratorWithLimits PASSED kafka.log.OffsetMapTest testBasicValidation PASSED kafka.log.OffsetMapTest testClear PASSED kafka.log.OffsetIndexTest truncate PASSED kafka.log.OffsetIndexTest randomLookupTest PASSED kafka.log.OffsetIndexTest lookupExtremeCases PASSED kafka.log.OffsetIndexTest appendTooMany PASSED kafka.log.OffsetIndexTest appendOutOfOrder PASSED kafka.log.OffsetIndexTest testReopen PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[0] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[1] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[2] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[3] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[4] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[5] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[6] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[7] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[8] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[9] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[10] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[11] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[12] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[13] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[14] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[15] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[16] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[17] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[18] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[19] PASSED kafka.log.LogManagerTest testCreateLog PASSED kafka.log.LogManagerTest testGetNonExistentLog PASSED kafka.log.LogManagerTest testCleanupExpiredSegments PASSED kafka.log.LogManagerTest testCleanupSegmentsToMaintainSize PASSED kafka.log.LogManagerTest testTimeBasedFlush PASSED kafka.log.LogManagerTest testLeastLoadedAssignment PASSED kafka.log.LogManagerTest testTwoLogManagersUsingSameDirFails PASSED kafka.log.LogManagerTest testCheckpointRecoveryPoints PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithTrailingSlash PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithRelativeDirectory PASSED kafka.log.LogCleanerIntegrationTest cleanerTest PASSED
Re: [DISCUSSION] KIP-11: ACL Management
Hi Gwen - I tend to agree with your proposal. As you mention the exact details / interfaces would need to be worked out, but this would be more in line with how JAAS and JACC work in the Java / JEE worlds. I do think that it might be nice to include / provide some helper APIs / methods for caching (so that people don't accidentally cause OOM situations) and it would be convenient to have some type of distribution / synchronization mechanisms for local / file based ACL lists (again perhaps it would be possible to provide a standard helper method for using Zookeeper?). -- Gari On Thu, Apr 16, 2015 at 11:44 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Authorization Fans, I'm starting a new thread on a specific sub-topic of KIP-11, since this is a bit long :) Currently KIP-11, as I understand it, proposes: * Authorizers are pluggable, with Kafka providing DefaultAuthorizer. * Kafka tools allow adding / managing ACLs. * Those ACLs are stored in ZK and cached in a new TopicCache * Authorizers can either use the ACLs defined and stored in Kafka, or define and use their own. I am concerned of two possible issues with this design: 1. Separation of concerns - only authorizers should worry about ACLs, and therefore the less code for ACLs that exist in Kafka core, the better. 2. User confusion - It sounded like we can define ACLs in Kafka itself but authorizers can also define their own, so kafka-topics --describe may show an ACL different than the one in use. This can be super confusing for admins. My alternative suggestion: * Authorizer API will include: grantPrivilege(ListPrincipals, ListPrivilege) revokePrivilege(ListPrincipals, ListPrivilege), getPrivilegesByPrincipal(Principal, Resource) (The exact API can be discussed in detail, but you get the idea) * Kafka tools will simply invoke these APIs when topics are added / modified / described. * Each authorizer (including the default one) will be responsible for storing, caching and using those ACLs. This way, we keep almost all ACL code with the Authorizer, where it belongs and users get a nice unified interface that reflects what is actually getting used in the system. This is pretty much how Sqoop and Hive implement their authorization APIs. What do you think? Gwen
[jira] [Updated] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.
[ https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1660: Attachment: KAFKA-1660_2015-04-16_11:35:37.patch Ability to call close() with a timeout on the Java Kafka Producer. --- Key: KAFKA-1660 URL: https://issues.apache.org/jira/browse/KAFKA-1660 Project: Kafka Issue Type: Improvement Components: clients, producer Affects Versions: 0.8.2.0 Reporter: Andrew Stein Assignee: Jiangjie Qin Fix For: 0.8.3 Attachments: KAFKA-1660.patch, KAFKA-1660.patch, KAFKA-1660_2015-02-17_16:41:19.patch, KAFKA-1660_2015-03-02_10:41:49.patch, KAFKA-1660_2015-03-08_21:14:50.patch, KAFKA-1660_2015-03-09_12:56:39.patch, KAFKA-1660_2015-03-25_10:55:42.patch, KAFKA-1660_2015-03-27_16:35:42.patch, KAFKA-1660_2015-04-07_18:18:40.patch, KAFKA-1660_2015-04-08_14:01:12.patch, KAFKA-1660_2015-04-10_15:08:54.patch, KAFKA-1660_2015-04-16_11:35:37.patch I would like the ability to call {{close}} with a timeout on the Java Client's KafkaProducer. h6. Workaround Currently, it is possible to ensure that {{close}} will return quickly by first doing a {{future.get(timeout)}} on the last future produced on each partition, but this means that the user has to define the partitions up front at the time of {{send}} and track the returned {{future}}'s -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.
[ https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498469#comment-14498469 ] Jiangjie Qin commented on KAFKA-1660: - Updated reviewboard https://reviews.apache.org/r/31850/diff/ against branch origin/trunk Ability to call close() with a timeout on the Java Kafka Producer. --- Key: KAFKA-1660 URL: https://issues.apache.org/jira/browse/KAFKA-1660 Project: Kafka Issue Type: Improvement Components: clients, producer Affects Versions: 0.8.2.0 Reporter: Andrew Stein Assignee: Jiangjie Qin Fix For: 0.8.3 Attachments: KAFKA-1660.patch, KAFKA-1660.patch, KAFKA-1660_2015-02-17_16:41:19.patch, KAFKA-1660_2015-03-02_10:41:49.patch, KAFKA-1660_2015-03-08_21:14:50.patch, KAFKA-1660_2015-03-09_12:56:39.patch, KAFKA-1660_2015-03-25_10:55:42.patch, KAFKA-1660_2015-03-27_16:35:42.patch, KAFKA-1660_2015-04-07_18:18:40.patch, KAFKA-1660_2015-04-08_14:01:12.patch, KAFKA-1660_2015-04-10_15:08:54.patch, KAFKA-1660_2015-04-16_11:35:37.patch I would like the ability to call {{close}} with a timeout on the Java Client's KafkaProducer. h6. Workaround Currently, it is possible to ensure that {{close}} will return quickly by first doing a {{future.get(timeout)}} on the last future produced on each partition, but this means that the user has to define the partitions up front at the time of {{send}} and track the returned {{future}}'s -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31850: Patch for KAFKA-1660
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/ --- (Updated April 16, 2015, 6:35 p.m.) Review request for kafka. Bugs: KAFKA-1660 https://issues.apache.org/jira/browse/KAFKA-1660 Repository: kafka Description (updated) --- A minor fix. Incorporated Guozhang's comments. Modify according to the latest conclusion. Patch for the finally passed KIP-15git status Addressed Joel and Guozhang's comments. rebased on trunk Rebase on trunk Addressed Joel's comments. Addressed Joel's comments Addressed Jay's comments Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db Diff: https://reviews.apache.org/r/31850/diff/ Testing --- Unit tests passed. Thanks, Jiangjie Qin
Re: Review Request 31850: Patch for KAFKA-1660
On April 11, 2015, 8:02 p.m., Jay Kreps wrote: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 155 https://reviews.apache.org/r/31850/diff/8/?file=923927#file923927line155 This scheme is clever but non-obvious, is there a simpler way? Jiangjie Qin wrote: I'm not sure if there is a simpler way. Maybe we can review the current approach again and see if we can simplify them. The goals we want to achieve here are: 1. When abortImcompleteBatch finishes, no more message should be appended. 2. Make sure when hasUnsent() return false, it does not miss any batch. The current solutions for them both depending on setting close flag first. To achieve (1), the implementation now is setting a close flag first and wait until all on going appends (if any) to finish. To achieve (2), the implementation synchoronizes on the deque. When an append grabs deque lock, it first check if close flag is set or not. If it is set, that means hasUnsent() might have already checked this deque, so it is not safe to append a new batch anymore. Otherwise it is safe to append a new batch. Thought about this again. And I went back to use the ReaderWriterLock with a small modification using tryLock. Hope it makes things cleaner. The idea is exactly the same as current approach but the code is less confusing. I put some reasoning below, please let me know if you have any suggestions. Essentially we want to make sure no messages or batches is left behind after 1. calling abortIncompleteBatches() 2. hasUnsent() returned false (hasUnsent() only cares about batch, not message) This means we need to make sure append will not proceed after these two events. We set close flag before these two events to reject append. And the key issue we want to solve here is how to deal with the appending thread. To detect if there are appending threads, we need something either a explicit flag or an exclusive lock (ReaderWriterLock as in previous patch) If there are appending going on, we have two options here: A. Fail the append B. wait until appends are done. It is a little bit difficult to fail the append because it is difficult to know which step appending is in. hasUnsent() uses dequeue lock to make sure no new batch can be added to a dequeue after it has been checked by hasUnsent(). That's why we need to check the close flag in append after grabbed dequeue lock again. For abortIncompleteBatches, currently we use option B. One tricky thing here is that a thread might block on buffer full when abortIncompleteBatches is waiting. This would lead to deadlock if the abortIncompleteBatches is called from sender thread as no memory will be released by sender thread and sender thread is waiting for the append which is waiting for memeory. This means we need to keep sender thread running to release memory. To solve this issue, instead of blocking on acquiring writelock, a tryLock is used, and if it returns false, we keep fail batches to release memory until we grab the write lock successfully. Then we abort batches for the last time. What do you think about this approach? On April 11, 2015, 8:02 p.m., Jay Kreps wrote: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 539 https://reviews.apache.org/r/31850/diff/8/?file=923924#file923924line539 It wouldn't block forever, that isn't correct, it would just block for the period of time they specified. Jiangjie Qin wrote: We are saying we will call close(0) instead of sender thread call close(timeout). And we do this to *avoid* blocking forever. I removed the comment to avoid confusion. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review79813 --- On April 16, 2015, 6:35 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/ --- (Updated April 16, 2015, 6:35 p.m.) Review request for kafka. Bugs: KAFKA-1660 https://issues.apache.org/jira/browse/KAFKA-1660 Repository: kafka Description --- A minor fix. Incorporated Guozhang's comments. Modify according to the latest conclusion. Patch for the finally passed KIP-15git status Addressed Joel and Guozhang's comments. rebased on trunk Rebase on trunk Addressed Joel's comments. Addressed Joel's comments Addressed Jay's comments Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a