Fwd: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
Hi, Here is the initial proposal for sasl/kerberos implementation for kafka https://cwiki.apache.org/confluence/x/YI4WAw and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am currently working on prototype which will add more details to the KIP. Just opening the thread to say the work is in progress. I'll update the thread with a initial prototype patch. Thanks, Harsha
[jira] [Updated] (KAFKA-1866) LogStartOffset gauge throws exceptions after log.delete()
[ https://issues.apache.org/jira/browse/KAFKA-1866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani updated KAFKA-1866: -- Attachment: KAFKA-1866_2015-02-11_09:25:33.patch LogStartOffset gauge throws exceptions after log.delete() - Key: KAFKA-1866 URL: https://issues.apache.org/jira/browse/KAFKA-1866 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Assignee: Sriharsha Chintalapani Attachments: KAFKA-1866.patch, KAFKA-1866_2015-02-10_22:50:09.patch, KAFKA-1866_2015-02-11_09:25:33.patch The LogStartOffset gauge does logSegments.head.baseOffset, which throws NoSuchElementException on an empty list, which can occur after a delete() of the log. This makes life harder for custom MetricsReporters, since they have to deal with .value() possibly throwing an exception. Locally we're dealing with this by having Log.delete() also call removeMetric on all the gauges. That also has the benefit of not having a bunch of metrics floating around for logs that the broker is not actually handling. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1757) Can not delete Topic index on Windows
[ https://issues.apache.org/jira/browse/KAFKA-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316454#comment-14316454 ] Sriharsha Chintalapani commented on KAFKA-1757: --- [~junrao] Can you please review the patch. Thanks. Can not delete Topic index on Windows - Key: KAFKA-1757 URL: https://issues.apache.org/jira/browse/KAFKA-1757 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.2.0 Reporter: Lukáš Vyhlídka Assignee: Sriharsha Chintalapani Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-1757.patch, lucky-v.patch When running the Kafka 0.8.2-Beta (Scala 2.10) on Windows, an attempt to delete the Topic throwed an error: ERROR [KafkaApi-1] error when handling request Name: StopReplicaRequest; Version: 0; CorrelationId: 38; ClientId: ; DeletePartitions: true; ControllerId: 0; ControllerEpoch: 3; Partitions: [test,0] (kafka.server.KafkaApis) kafka.common.KafkaStorageException: Delete of index .index failed. at kafka.log.LogSegment.delete(LogSegment.scala:283) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log.delete(Log.scala:608) at kafka.log.LogManager.deleteLog(LogManager.scala:375) at kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:144) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inWriteLock(Utils.scala:543) at kafka.cluster.Partition.delete(Partition.scala:139) at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:158) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:191) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:190) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:190) at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:96) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:744) When I have investigated the issue I figured out that the index file (in my environment it was C:\tmp\kafka-logs\----0014-0\.index) was locked by the kafka process and the OS did not allow to delete that file. I tried to fix the problem in source codes and when I added close() method call into LogSegment.delete(), the Topic deletion started to work. I will add here (not sure how to upload the file during issue creation) a diff with the changes I have made so You can take a look on that whether it is reasonable or not. It would be perfect if it could make it into the product... In the end I would like to say that on Linux the deletion works just fine... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1866) LogStartOffset gauge throws exceptions after log.delete()
[ https://issues.apache.org/jira/browse/KAFKA-1866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316572#comment-14316572 ] Sriharsha Chintalapani commented on KAFKA-1866: --- Updated reviewboard https://reviews.apache.org/r/30084/diff/ against branch origin/trunk LogStartOffset gauge throws exceptions after log.delete() - Key: KAFKA-1866 URL: https://issues.apache.org/jira/browse/KAFKA-1866 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Assignee: Sriharsha Chintalapani Attachments: KAFKA-1866.patch, KAFKA-1866_2015-02-10_22:50:09.patch, KAFKA-1866_2015-02-11_09:25:33.patch The LogStartOffset gauge does logSegments.head.baseOffset, which throws NoSuchElementException on an empty list, which can occur after a delete() of the log. This makes life harder for custom MetricsReporters, since they have to deal with .value() possibly throwing an exception. Locally we're dealing with this by having Log.delete() also call removeMetric on all the gauges. That also has the benefit of not having a bunch of metrics floating around for logs that the broker is not actually handling. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [KIP-DISCUSSION] Mirror Maker Enhancement
Hi Jay, The data channels are actually a big part of the complexity of the zero data loss design, though, right? Because then you need some reverse channel to flow the acks back to the consumer based on where you are versus just acking what you have read and written (as in the code snippet I put up). I'm not sure if we are on the same page. Even if the data channel was not there the current handling for zero data loss would remain very similar - you would need to maintain lists of unacked source offsets. I'm wondering if the KIP needs more detail on how it is currently implemented; or are suggesting a different approach (in which case I have not fully understood). I'm not sure what you mean by flowing acks back to the consumer - the MM commits offsets after the producer ack has been received. There is some additional complexity introduced in reducing duplicates on a rebalance - this is actually optional (since duplicates are currently a given). The reason that was done anyway is that with the auto-commit turned off duplicates are almost guaranteed on a rebalance. I think the point that Neha and I were trying to make was that the motivation to embed stuff into MM kind of is related to how complex a simple consume and produce with good throughput will be. If it is simple to write such a thing in a few lines, the pain of embedding a bunch of stuff won't be worth it, if it has to be as complex as the current mm then of course we will need all kinds of plug ins because no one will be able to write such a thing. I don't have a huge concern with a simple plug-in but I think if it turns into something more complex with filtering and aggregation or whatever we really need to stop and think a bit about the design. I agree - I don't think there is a use-case for any complex plug-in. It is pretty much what Becket has described currently for the message handler - i.e., take an incoming record and return a list of outgoing records (which could be empty if you filter). So here is my take on the MM: - Bare bones: simple consumer - producer pairs (0.7 style). This is ideal, but does not handle no data loss - Above plus support no data loss. This actually adds quite a bit of complexity. - Above plus the message handler. This is a trivial addition I think that makes the MM usable in a few other mirroring-like applications. Joel On Tue, Feb 10, 2015 at 12:31 PM, Joel Koshy jjkosh...@gmail.com wrote: On Tue, Feb 10, 2015 at 12:13:46PM -0800, Neha Narkhede wrote: I think all of us agree that we want to design MirrorMaker for 0 data loss. With the absence of the data channel, 0 data loss will be much simpler to implement. The data channel is irrelevant to the implementation of zero data loss. The complexity in the implementation of no data loss that you are seeing in mirror-maker affects all consume-then-produce patterns whether or not there is a data channel. You still need to maintain a list of unacked offsets. What I meant earlier is that we can brainstorm completely different approaches to supporting no data loss, but the current implementation is the only solution we are aware of. My arguments for adding a message handler are that: 1. It is more efficient to do something in common for all the clients in pipeline than letting each client do the same thing for many times. And there are concrete use cases for the message handler already. What are the concrete use cases? I think Becket already described a couple of use cases earlier in the thread. quote 1. Format conversion. We have a use case where clients of source cluster use an internal schema and clients of target cluster use a different public schema. 2. Message filtering: For the messages published to source cluster, there are some messages private to source cluster clients and should not exposed to target cluster clients. It would be difficult to publish those messages into different partitions because they need to be ordered. I agree that we can always filter/convert messages after they are copied to the target cluster, but that costs network bandwidth unnecessarily, especially if that is a cross colo mirror. With the handler, we can co-locate the mirror maker with source cluster and save that cost. Also, imagine there are many downstream consumers consuming from the target cluster, filtering/reformatting the messages before the messages reach the target cluster is much more efficient than having each of the consumers do this individually on their own. /quote Also the KIP still refers to the data channel in a few places (Motivation and On consumer rebalance sections). Can you update the wiki so it is easier to review the new design, especially the data loss part. On Tue, Feb 10, 2015 at 10:36 AM, Joel Koshy jjkosh...@gmail.com wrote: I think the message handler adds little to no
[DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
Hi, Here is the initial proposal for sasl/kerberos implementation for kafka https://cwiki.apache.org/confluence/x/YI4WAw and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am currently working on prototype which will add more details to the KIP. Just opening the thread to say the work is in progress. I'll update the thread with a initial prototype patch. Thanks, Harsha
[jira] [Commented] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)
[ https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316455#comment-14316455 ] Sriharsha Chintalapani commented on KAFKA-1566: --- [~jkreps] [~nehanarkhede] Can you please review this. kafka-env.sh will allow the flexibility of defining a custom java_home for the users. Kafka environment configuration (kafka-env.sh) -- Key: KAFKA-1566 URL: https://issues.apache.org/jira/browse/KAFKA-1566 Project: Kafka Issue Type: Improvement Components: tools Reporter: Cosmin Lehene Assignee: Sriharsha Chintalapani Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1566.patch It would be useful (especially for automated deployments) to have an environment configuration file that could be sourced from the launcher files (e.g. kafka-run-server.sh). This is how this could look like kafka-env.sh {code} export KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseCompressedOops -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35' % export KAFKA_HEAP_OPTS='-Xmx1G -Xms1G' % export KAFKA_LOG4J_OPTS=-Dkafka.logs.dir=/var/log/kafka {code} kafka-server-start.sh {code} ... source $base_dir/config/kafka-env.sh ... {code} This approach is consistent with Hadoop and HBase. However the idea here is to be able to set these values in a single place without having to edit startup scripts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1852) OffsetCommitRequest can commit offset on unknown topic
[ https://issues.apache.org/jira/browse/KAFKA-1852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316447#comment-14316447 ] Sriharsha Chintalapani commented on KAFKA-1852: --- [~jjkoshy] pinging for a review. Thanks. OffsetCommitRequest can commit offset on unknown topic -- Key: KAFKA-1852 URL: https://issues.apache.org/jira/browse/KAFKA-1852 Project: Kafka Issue Type: Bug Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Sriharsha Chintalapani Attachments: KAFKA-1852.patch, KAFKA-1852_2015-01-19_10:44:01.patch Currently, we allow an offset to be committed to Kafka, even when the topic/partition for the offset doesn't exist. We probably should disallow that and send an error back in that case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics
[ https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316526#comment-14316526 ] Jay Kreps commented on KAFKA-1374: -- Hey guys, the test kafka.tools.TestLogCleaning is a very aggressive test that runs against a kafka cluster configured for log compaction. It produces a bunch of messages and compacts them continuously and then does an out of band comparison of the two. It would be good to ensure that stills works on really large cleaner runs with deletes with this patch. LogCleaner (compaction) does not support compressed topics -- Key: KAFKA-1374 URL: https://issues.apache.org/jira/browse/KAFKA-1374 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch, KAFKA-1374_2015-01-18_00:19:21.patch This is a known issue, but opening a ticket to track. If you try to compact a topic that has compressed messages you will run into various exceptions - typically because during iteration we advance the position based on the decompressed size of the message. I have a bunch of stack traces, but it should be straightforward to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [KIP-DISCUSSION] Mirror Maker Enhancement
Guozhang, I agree with 1-3, I do think what I was proposing was simpler but perhaps there are gaps in that? Hey Joel--Here was a sketch of what I was proposing. I do think this get's rid of manual offset tracking, especially doing so across threads with dedicated commit threads, which I think is pretty complex. while(true) { val recs = consumer.poll(Long.MaxValue); for (rec - recs) producer.send(rec, logErrorCallback) if(System.currentTimeMillis - lastCommit commitInterval) { producer.flush() consumer.commit() lastCommit = System.currentTimeMillis } } (See the previous email for details). I think the question is: is there any reason--performance, correctness, etc--that this won't work? Basically I think you guys have thought about this more so I may be missing something. If so let's flag it while we still have leeway on the consumer. If we think that will work, well I do think it is conceptually a lot simpler than the current code, though I suppose one could disagree on that. -Jay On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy jjkosh...@gmail.com wrote: Hi Jay, The data channels are actually a big part of the complexity of the zero data loss design, though, right? Because then you need some reverse channel to flow the acks back to the consumer based on where you are versus just acking what you have read and written (as in the code snippet I put up). I'm not sure if we are on the same page. Even if the data channel was not there the current handling for zero data loss would remain very similar - you would need to maintain lists of unacked source offsets. I'm wondering if the KIP needs more detail on how it is currently implemented; or are suggesting a different approach (in which case I have not fully understood). I'm not sure what you mean by flowing acks back to the consumer - the MM commits offsets after the producer ack has been received. There is some additional complexity introduced in reducing duplicates on a rebalance - this is actually optional (since duplicates are currently a given). The reason that was done anyway is that with the auto-commit turned off duplicates are almost guaranteed on a rebalance. I think the point that Neha and I were trying to make was that the motivation to embed stuff into MM kind of is related to how complex a simple consume and produce with good throughput will be. If it is simple to write such a thing in a few lines, the pain of embedding a bunch of stuff won't be worth it, if it has to be as complex as the current mm then of course we will need all kinds of plug ins because no one will be able to write such a thing. I don't have a huge concern with a simple plug-in but I think if it turns into something more complex with filtering and aggregation or whatever we really need to stop and think a bit about the design. I agree - I don't think there is a use-case for any complex plug-in. It is pretty much what Becket has described currently for the message handler - i.e., take an incoming record and return a list of outgoing records (which could be empty if you filter). So here is my take on the MM: - Bare bones: simple consumer - producer pairs (0.7 style). This is ideal, but does not handle no data loss - Above plus support no data loss. This actually adds quite a bit of complexity. - Above plus the message handler. This is a trivial addition I think that makes the MM usable in a few other mirroring-like applications. Joel On Tue, Feb 10, 2015 at 12:31 PM, Joel Koshy jjkosh...@gmail.com wrote: On Tue, Feb 10, 2015 at 12:13:46PM -0800, Neha Narkhede wrote: I think all of us agree that we want to design MirrorMaker for 0 data loss. With the absence of the data channel, 0 data loss will be much simpler to implement. The data channel is irrelevant to the implementation of zero data loss. The complexity in the implementation of no data loss that you are seeing in mirror-maker affects all consume-then-produce patterns whether or not there is a data channel. You still need to maintain a list of unacked offsets. What I meant earlier is that we can brainstorm completely different approaches to supporting no data loss, but the current implementation is the only solution we are aware of. My arguments for adding a message handler are that: 1. It is more efficient to do something in common for all the clients in pipeline than letting each client do the same thing for many times. And there are concrete use cases for the message handler already. What are the concrete use cases? I think Becket already described a couple of use cases earlier in the thread. quote 1. Format conversion. We have a use case where clients of source cluster use an internal schema and clients of target cluster use a different public schema. 2. Message
Re: Review Request 30084: Patch for KAFKA-1866
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30084/ --- (Updated Feb. 11, 2015, 5:25 p.m.) Review request for kafka. Bugs: KAFKA-1866 https://issues.apache.org/jira/browse/KAFKA-1866 Repository: kafka Description --- KAFKA-1866. LogStartOffset gauge throws exceptions after log.delete(). Diffs (updated) - core/src/main/scala/kafka/cluster/Partition.scala e6ad8be5e33b6fb31c078ad78f8de709869ddc04 core/src/main/scala/kafka/log/Log.scala 846023bb98d0fa0603016466360c97071ac935ea core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 3cf23b3d6d4460535b90cfb36281714788fc681c core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 Diff: https://reviews.apache.org/r/30084/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1852) OffsetCommitRequest can commit offset on unknown topic
[ https://issues.apache.org/jira/browse/KAFKA-1852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316739#comment-14316739 ] Joel Koshy commented on KAFKA-1852: --- Thanks for the ping - will take a look. OffsetCommitRequest can commit offset on unknown topic -- Key: KAFKA-1852 URL: https://issues.apache.org/jira/browse/KAFKA-1852 Project: Kafka Issue Type: Bug Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Sriharsha Chintalapani Attachments: KAFKA-1852.patch, KAFKA-1852_2015-01-19_10:44:01.patch Currently, we allow an offset to be committed to Kafka, even when the topic/partition for the offset doesn't exist. We probably should disallow that and send an error back in that case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1943) Producer request failure rate should not include MessageSetSizeTooLarge and MessageSizeTooLargeException
[ https://issues.apache.org/jira/browse/KAFKA-1943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya Auradkar updated KAFKA-1943: --- Issue Type: Sub-task (was: Bug) Parent: KAFKA-1946 Producer request failure rate should not include MessageSetSizeTooLarge and MessageSizeTooLargeException Key: KAFKA-1943 URL: https://issues.apache.org/jira/browse/KAFKA-1943 Project: Kafka Issue Type: Sub-task Reporter: Aditya A Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-1943.patch If MessageSetSizeTooLargeException or MessageSizeTooLargeException is thrown from Log, then ReplicaManager counts it as a failed produce request. My understanding is that this metric should only count failures as a result of broker issues and not bad requests sent by the clients. If the message or message set is too large, then it is a client side error and should not be reported. (similar to NotLeaderForPartitionException, UnknownTopicOrPartitionException). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
For the Sample usage section, please consider https://github.com/airbnb/kafkat. We find that tool to be very easy to use, and extremely useful for our administration tasks. Chi On Mon, Feb 9, 2015 at 9:03 AM, Guozhang Wang wangg...@gmail.com wrote: I feel the benefits of lowering the development bar for new clients does not worth the complexity we need to introduce in the server side, as today the clients just need one more request type (metadata request) to send the produce / fetch to the right brokers, whereas re-routing mechanism will result in complicated between-brokers communication patterns that potentially impact Kafka performance and making debugging / trouble shooting much harder. An alternative way to ease the development of the clients is to use a proxy in front of the kafka servers, like the rest proxy we have built before, which we use for non-java clients primarily but also can be treated as handling cluster metadata discovery for clients. Comparing to the re-routing idea, the proxy also introduces two-hops but its layered architecture is simpler. Guozhang On Sun, Feb 8, 2015 at 8:00 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jiangjie, Re routing support doesn't force clients to use it. Java and all existing clients would work as now where request are intelligently routed by the client, but this would lower the bar for new clients. That said I agree the case for reroute get admin commands is much stronger than data. The idea of separating admin/metadata from would definitely solve some problems but it would also add a lot of complexity--new ports, thread pools, etc. this is an interesting idea to think over but I'm not sure if it's worth it. Probably a separate effort in any case. -jay On Friday, February 6, 2015, Jiangjie Qin j...@linkedin.com.invalid wrote: I¹m a little bit concerned about the request routers among brokers. Typically we have a dominant percentage of produce and fetch request/response. Routing them from one broker to another seems not wanted. Also I think we generally have two types of requests/responses: data related and admin related. It is typically a good practice to separate data plain from control plain. That suggests we should have another admin port to serve those admin requests and probably have different authentication/authorization from the data port. Jiangjie (Becket) Qin On 2/6/15, 11:18 AM, Joe Stein joe.st...@stealth.ly wrote: I updated the installation and sample usage for the existing patches on the KIP site https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and +centralized+administrative+operations There are still a few pending items here. 1) There was already some discussion about using the Broker that is the Controller here https://issues.apache.org/jira/browse/KAFKA-1772 and we should elaborate on that more in the thread or agree we are ok with admin asking for the controller to talk to and then just sending that broker the admin tasks. 2) I like this idea https://issues.apache.org/jira/browse/KAFKA-1912 but we can refactor after KAFK-1694 committed, no? I know folks just want to talk to the broker that is the controller. It may even become useful to have the controller run on a broker that isn't even a topic broker anymore (small can of worms I am opening here but it elaborates on Guozhang's hot spot point. 3) anymore feedback? - Joe Stein On Fri, Jan 23, 2015 at 3:15 PM, Guozhang Wang wangg...@gmail.com wrote: A centralized admin operation protocol would be very useful. One more general comment here is that controller is originally designed to only talk to other brokers through ControllerChannel, while the broker instance which carries the current controller is agnostic of its existence, and use KafkaApis to handle general Kafka requests. Having all admin requests redirected to the controller instance will force the broker to be aware of its carried controller, and access its internal data for handling these requests. Plus with the number of clients out of Kafka's control, this may easily cause the controller to be a hot spot in terms of request load. On Thu, Jan 22, 2015 at 10:09 PM, Joe Stein joe.st...@stealth.ly wrote: inline On Thu, Jan 22, 2015 at 11:59 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, This is great. A few comments on KIP-4 1. This is much needed functionality, but there are a lot of the so let's really think these protocols through. We really want to end up with a set of well thought-out, orthoganol apis. For this reason I think it is really important to think through the end state even if that includes APIs we
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316747#comment-14316747 ] Jay Kreps commented on KAFKA-1646: -- Hey [~waldenchen] yeah I think what you are saying is that the offset in the recovery point checkpoint should always be the last sync'd offset irrespective of segment file boundaries and we should optimize recovery to just recovery from that offset rather than always recovering the full last segment. That would work, and actually makes more sense than the current approach, but is a fairly involved and correctness critical change. One of the challenges is the offset index needs to be reconstructed from that point on as well, but there is a bit of a chicken and egg problem, because how do you search into the log at all if the index itself is corrupt? Another approach could be to truncate off the preallocated file extent on clean shutdown. This is actually effectively what we do for the offset indexes anyway. This would also avoid windows specific code since we could do this in all cases. Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646_20141216_163008.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1946) Improve BrokerTopicMetrics reporting
Aditya Auradkar created KAFKA-1946: -- Summary: Improve BrokerTopicMetrics reporting Key: KAFKA-1946 URL: https://issues.apache.org/jira/browse/KAFKA-1946 Project: Kafka Issue Type: Improvement Reporter: Aditya Auradkar Assignee: Aditya Auradkar Creating an umbrella ticket to track improvement of BrokerTopicMetrics reporting. Some of the tasks are: - Add a metric for total fetch/produce requests as opposed to simply failure counts - Tracking offset commit requests separately from produce requests - Adding a metric to track bad requests from clients. (HTTP 4XX vs 5XX as an example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
Thanks Harsha, looks good so far. How were you thinking of running the KerberosTicketManager as a standalone process or like controller or is it a layer of code that does the plumbing pieces everywhere? ~ Joestein On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote: Hi, Here is the initial proposal for sasl/kerberos implementation for kafka https://cwiki.apache.org/confluence/x/YI4WAw and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am currently working on prototype which will add more details to the KIP. Just opening the thread to say the work is in progress. I'll update the thread with a initial prototype patch. Thanks, Harsha
[jira] [Commented] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics
[ https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316735#comment-14316735 ] Jay Kreps commented on KAFKA-1374: -- Great! LogCleaner (compaction) does not support compressed topics -- Key: KAFKA-1374 URL: https://issues.apache.org/jira/browse/KAFKA-1374 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch, KAFKA-1374_2015-01-18_00:19:21.patch This is a known issue, but opening a ticket to track. If you try to compact a topic that has compressed messages you will run into various exceptions - typically because during iteration we advance the position based on the decompressed size of the message. I have a bunch of stack traces, but it should be straightforward to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1914) Count TotalProduceRequestRate and TotalFetchRequestRate in BrokerTopicMetrics
[ https://issues.apache.org/jira/browse/KAFKA-1914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya Auradkar updated KAFKA-1914: --- Issue Type: Sub-task (was: Bug) Parent: KAFKA-1946 Count TotalProduceRequestRate and TotalFetchRequestRate in BrokerTopicMetrics - Key: KAFKA-1914 URL: https://issues.apache.org/jira/browse/KAFKA-1914 Project: Kafka Issue Type: Sub-task Components: core Reporter: Aditya A Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-1914.patch Currently the BrokerTopicMetrics only counts the failedProduceRequestRate and the failedFetchRequestRate. We should add 2 metrics to count the overall produce/fetch request rates. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1419) cross build for scala 2.11
[ https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316760#comment-14316760 ] Helena Edelson commented on KAFKA-1419: --- This ticket says the cross build is available for kafka v 0.8.1.2 with Scala 2.11 but I don't see that artifact anywhere? cross build for scala 2.11 -- Key: KAFKA-1419 URL: https://issues.apache.org/jira/browse/KAFKA-1419 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.1 Reporter: Scott Clasen Assignee: Ivan Lyutov Priority: Blocker Fix For: 0.8.1.2, 0.8.2.0 Attachments: KAFKA-1419-scalaBinaryVersion.patch, KAFKA-1419-scalaBinaryVersion.patch, KAFKA-1419.patch, KAFKA-1419.patch, KAFKA-1419_2014-07-28_15:05:16.patch, KAFKA-1419_2014-07-29_15:13:43.patch, KAFKA-1419_2014-08-04_14:43:26.patch, KAFKA-1419_2014-08-05_12:51:16.patch, KAFKA-1419_2014-08-07_10:17:34.patch, KAFKA-1419_2014-08-07_10:52:18.patch, KAFKA-1419_cross_build_for_scala_2_11_for_0_8_1_branch.patch Please publish builds for scala 2.11, hopefully just needs a small tweak to the gradle conf? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils
[ https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316759#comment-14316759 ] Jay Kreps commented on KAFKA-1926: -- At a high level this looks good. The main thrust of the ticket had actually been to clean up the class Utils.scala and migrate a lot of that to Utils.java, but this mostly actually cleans up other utils packages which is also good and needed. A few minor comments: 1. The time constants should probably move into Time.java as static final variables, right? 2. If Time moves we may need to move Scheduler, KafkaScheduler, and MockScheduler as they are intertwined. 3. SystemTime.java should probably deprecate SystemTime.scala, right? 4. Time.scala should be deleted? Replace kafka.utils.Utils with o.a.k.common.utils.Utils --- Key: KAFKA-1926 URL: https://issues.apache.org/jira/browse/KAFKA-1926 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2.0 Reporter: Jay Kreps Labels: newbie, patch Attachments: KAFKA-1926.patch There is currently a lot of duplication between the Utils class in common and the one in core. Our plan has been to deprecate duplicate code in the server and replace it with the new common code. As such we should evaluate each method in the scala Utils and do one of the following: 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose utility in active use that is not Kafka-specific. If we migrate it we should really think about the API and make sure there is some test coverage. A few things in there are kind of funky and we shouldn't just blindly copy them over. 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold any utilities that really need to make use of Scala features to be convenient. 3. Delete it if it is not used, or has a bad api. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils
[ https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316830#comment-14316830 ] Tong Li commented on KAFKA-1926: [~jkreps] Jay, really appreciate your quick review comments. I will see what I can do and will submit another patch based on trunk branch. New patch set will come up real soon. Thanks so much. Replace kafka.utils.Utils with o.a.k.common.utils.Utils --- Key: KAFKA-1926 URL: https://issues.apache.org/jira/browse/KAFKA-1926 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2.0 Reporter: Jay Kreps Labels: newbie, patch Attachments: KAFKA-1926.patch There is currently a lot of duplication between the Utils class in common and the one in core. Our plan has been to deprecate duplicate code in the server and replace it with the new common code. As such we should evaluate each method in the scala Utils and do one of the following: 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose utility in active use that is not Kafka-specific. If we migrate it we should really think about the API and make sure there is some test coverage. A few things in there are kind of funky and we shouldn't just blindly copy them over. 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold any utilities that really need to make use of Scala features to be convenient. 3. Delete it if it is not used, or has a bad api. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints
+1 for proposed changes in 1 and 2. 1. The impact is that if someone uses SimpleConsumer and references Broker explicitly, the application needs code change to compile with 0.8.3. Since SimpleConsumer is not widely used, breaking the API in SimpleConsumer but maintaining overall code cleanness seems to be a better tradeoff. 2. For clarification, the issue is the following. In 0.8.3, we will be evolving the wire protocol of UpdateMedataRequest (to send info about endpoints for different security protocols). Since this is used in intra-cluster communication, we need to do the upgrade in two steps. The idea is that in 0.8.3, we will default wire.protocol.version to 0.8.2. When upgrading to 0.8.3, in step 1, we do a rolling upgrade to 0.8.3. After step 1, all brokers will be capable for processing the new protocol in 0.8.3, but without actually using it. In step 2, we configure wire.protocol.version to 0.8.3 in each broker and do another rolling restart. After step 2, all brokers will start using the new protocol in 0.8.3. Let's say that in the next release 0.9, we are changing the intra-cluster wire protocol again. We will do the similar thing: defaulting wire.protocol.version to 0.8.3 in 0.9 so that people can upgrade from 0.8.3 to 0.9 in two steps. For people who want to upgrade from 0.8.2 to 0.9 directly, they will have to configure wire.protocol.version to 0.8.2 first and then do the two-step upgrade to 0.9. Gwen, In KIP2, there is still a reference to use.new.protocol. This needs to be removed. Also, would it be better to use intra.cluster.wire.protocol.version since this only applies to the wire protocol among brokers? Others, The patch in KAFKA-1809 is almost ready. It would be good to wrap up the discussion on KIP2 soon. So, if you haven't looked at this KIP, please take a look and send your comments. Thanks, Jun On Mon, Jan 26, 2015 at 8:02 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Devs, While reviewing the patch for KAFKA-1809, we came across two questions that we are interested in hearing the community out on. 1. This patch changes the Broker class and adds a new class BrokerEndPoint that behaves like the previous broker. While technically kafka.cluster.Broker is not part of the public API, it is returned by javaapi, used with the SimpleConsumer. Getting replicas from PartitionMetadata will now return BrokerEndPoint instead of Broker. All method calls remain the same, but since we return a new type, we break the API. Note that this breakage does not prevent upgrades - existing SimpleConsumers will continue working (because we are wire-compatible). The only thing that won't work is building SimpleConsumers with dependency on Kafka versions higher than 0.8.2. Arguably, we don't want anyone to do it anyway :) So: Do we state that the highest release on which SimpleConsumers can depend is 0.8.2? Or shall we keep Broker as is and create an UberBroker which will contain multiple brokers as its endpoints? 2. The KIP suggests use.new.wire.protocol configuration to decide which protocols the brokers will use to talk to each other. The problem is that after the next upgrade, the wire protocol is no longer new, so we'll have to reset it to false for the following upgrade, then change to true again... and upgrading more than a single version will be impossible. Bad idea :) As an alternative, we can have a property for each version and set one of them to true. Or (simple, I think) have wire.protocol.version property and accept version numbers (0.8.2, 0.8.3, 0.9) as values. Please share your thoughts :) Gwen
[jira] [Updated] (KAFKA-1938) [doc] Quick start example should reference appropriate Kafka version
[ https://issues.apache.org/jira/browse/KAFKA-1938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-1938: --- Attachment: remove-081-references.patch Attaching a patch which removes 0.8.1 references form 0.8.2 docs. [doc] Quick start example should reference appropriate Kafka version Key: KAFKA-1938 URL: https://issues.apache.org/jira/browse/KAFKA-1938 Project: Kafka Issue Type: Improvement Components: website Affects Versions: 0.8.2.0 Reporter: Stevo Slavic Priority: Trivial Attachments: remove-081-references.patch Kafka 0.8.2.0 documentation, quick start example on https://kafka.apache.org/documentation.html#quickstart in step 1 links and instructs reader to download Kafka 0.8.1.1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics
[ https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316691#comment-14316691 ] Manikumar Reddy commented on KAFKA-1374: yes, we are using TestLogCleaning tool to test the changes. TestLogCleaning stress test output for compressed messages {code} Producing 10 messages... Logging produce requests to /tmp/kafka-log-cleaner-produced-6014466306002699464.txt Sleeping for 120 seconds... Consuming messages... Logging consumed messages to /tmp/kafka-log-cleaner-consumed-177538909590644701.txt 10 rows of data produced, 13165 rows of data consumed (86.8% reduction). De-duplicating and validating output files... Validated 9005 values, 0 mismatches. Producing 100 messages... Logging produce requests to /tmp/kafka-log-cleaner-produced-3298578695475992991.txt Sleeping for 120 seconds... Consuming messages... Logging consumed messages to /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt 100 rows of data produced, 119926 rows of data consumed (88.0% reduction). De-duplicating and validating output files... Validated 89947 values, 0 mismatches. Producing 1000 messages... Logging produce requests to /tmp/kafka-log-cleaner-produced-3336255463347572934.txt Sleeping for 120 seconds... Consuming messages... Logging consumed messages to /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt 1000 rows of data produced, 1645281 rows of data consumed (83.5% reduction). De-duplicating and validating output files... Validated 899853 values, 0 mismatches. {code} LogCleaner (compaction) does not support compressed topics -- Key: KAFKA-1374 URL: https://issues.apache.org/jira/browse/KAFKA-1374 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch, KAFKA-1374_2015-01-18_00:19:21.patch This is a known issue, but opening a ticket to track. If you try to compact a topic that has compressed messages you will run into various exceptions - typically because during iteration we advance the position based on the decompressed size of the message. I have a bunch of stack traces, but it should be straightforward to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-1887) controller error message on shutting the last broker
[ https://issues.apache.org/jira/browse/KAFKA-1887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani reassigned KAFKA-1887: - Assignee: Sriharsha Chintalapani controller error message on shutting the last broker Key: KAFKA-1887 URL: https://issues.apache.org/jira/browse/KAFKA-1887 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao Assignee: Sriharsha Chintalapani Priority: Minor Fix For: 0.8.3 We always see the following error in state-change log on shutting down the last broker. [2015-01-20 13:21:04,036] ERROR Controller 0 epoch 3 initiated state change for partition [test,0] from OfflinePartition to OnlinePartition failed (state.change.logger) kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)] at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:75) at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:357) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:206) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117) at kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:446) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:373) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1920) Add a metric to count client side errors in BrokerTopicMetrics
[ https://issues.apache.org/jira/browse/KAFKA-1920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya Auradkar updated KAFKA-1920: --- Issue Type: Sub-task (was: Improvement) Parent: KAFKA-1946 Add a metric to count client side errors in BrokerTopicMetrics -- Key: KAFKA-1920 URL: https://issues.apache.org/jira/browse/KAFKA-1920 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Currently the BrokerTopicMetrics count only failures across all topics and for individual topics. Should we consider adding a metric to count the number of client side errors? This essentially counts the number of bad requests per topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics
[ https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316714#comment-14316714 ] Joel Koshy commented on KAFKA-1374: --- I can review this next week. However, as far as checking in is concerned I would strongly prefer to get KAFKA-1755 done first (for which I have a patch almost ready). The reason for that is that this patch is a significant change to the log cleaner and I would rather get some defensive code in first since the log cleaner health is critical for offset management as well as for Samza use-cases. LogCleaner (compaction) does not support compressed topics -- Key: KAFKA-1374 URL: https://issues.apache.org/jira/browse/KAFKA-1374 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch, KAFKA-1374_2015-01-18_00:19:21.patch This is a known issue, but opening a ticket to track. If you try to compact a topic that has compressed messages you will run into various exceptions - typically because during iteration we advance the position based on the decompressed size of the message. I have a bunch of stack traces, but it should be straightforward to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1936) Track offset commit requests separately from produce requests
[ https://issues.apache.org/jira/browse/KAFKA-1936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya Auradkar updated KAFKA-1936: --- Issue Type: Sub-task (was: Bug) Parent: KAFKA-1946 Track offset commit requests separately from produce requests - Key: KAFKA-1936 URL: https://issues.apache.org/jira/browse/KAFKA-1936 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar In ReplicaManager, failed and total produce requests are updated from appendToLocalLog. Since offset commit requests also follow the same path, they are counted along with produce requests. Add a metric and count them separately. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1945) MetaData Response - Broker hostname is wrong
[ https://issues.apache.org/jira/browse/KAFKA-1945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy resolved KAFKA-1945. --- Resolution: Invalid MetaData Response - Broker hostname is wrong Key: KAFKA-1945 URL: https://issues.apache.org/jira/browse/KAFKA-1945 Project: Kafka Issue Type: Bug Components: core Reporter: saravana kumar I use python-kafka's SimpleConsumer to listen to a topic in kafka broker. Kafka broker is running on a machine with its hostname as BROKER_HOST. Now, SimpleConsumer from another machine requests for topic metadata from the broker BROKER_HOST for a topic TOPIC gets a python tuple (Broker metadata, Topic metadata) Broker metadata comes as, {0: BrokerMetadata(nodeId=0, host='localhost', port=9092)} ideally, host value must be BROKER_HOST(hostname cmd from broker shell tty confirms it) but it comes as localhost... How does the wrong broker metadata for a topic get into kafka system? And obviously, this breaks the system since my consumer tries to connect to 9092 on its localhost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Build failed in Jenkins: Kafka-trunk #389
I ran into this issue locally too. I left it running overnight and it was stuck when I got back to it. This is where I had to kill the terminal at this morning. kafka.admin.TopicCommandTest testConfigPreservationAcrossPartitionAlteration PASSED kafka.api.ApiUtilsTest testShortStringASCII PASSED kafka.api.ApiUtilsTest testShortStringNonASCII PASSED Building 56% :core:test 36 tests completed I ran it again just now and the tests passed fine. ~ Joestein On Mon, Feb 9, 2015 at 8:08 PM, Apache Jenkins Server jenk...@builds.apache.org wrote: See https://builds.apache.org/job/Kafka-trunk/389/changes Changes: [wangguoz] KAFKA-1333; Add the consumer coordinator to server; reviewed by Onur Karaman and Jay Kreps [wangguoz] KAFKA-1333 follow-up; Add missing files for the coordinator folder -- [...truncated 1789 lines...] kafka.producer.AsyncProducerTest testProducerQueueSize PASSED kafka.producer.AsyncProducerTest testProduceAfterClosed PASSED kafka.producer.AsyncProducerTest testBatchSize PASSED kafka.producer.AsyncProducerTest testQueueTimeExpired PASSED kafka.producer.AsyncProducerTest testPartitionAndCollateEvents PASSED kafka.producer.AsyncProducerTest testSerializeEvents PASSED kafka.producer.AsyncProducerTest testInvalidPartition PASSED kafka.producer.AsyncProducerTest testNoBroker PASSED kafka.producer.AsyncProducerTest testIncompatibleEncoder PASSED kafka.producer.AsyncProducerTest testRandomPartitioner PASSED kafka.producer.AsyncProducerTest testFailedSendRetryLogic PASSED kafka.producer.AsyncProducerTest testJavaProducer PASSED kafka.producer.AsyncProducerTest testInvalidConfiguration PASSED kafka.log.CleanerTest testCleanSegments PASSED kafka.log.CleanerTest testCleaningWithDeletes PASSED kafka.log.CleanerTest testCleanSegmentsWithAbort PASSED kafka.log.CleanerTest testSegmentGrouping PASSED kafka.log.CleanerTest testBuildOffsetMap PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[0] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[1] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[2] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[3] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[4] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[5] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[6] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[7] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[8] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[9] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[10] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[11] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[12] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[13] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[14] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[15] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[16] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[17] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[18] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[19] PASSED kafka.log.LogManagerTest testCreateLog PASSED kafka.log.LogManagerTest testGetNonExistentLog PASSED kafka.log.LogManagerTest testCleanupExpiredSegments PASSED kafka.log.LogManagerTest testCleanupSegmentsToMaintainSize PASSED kafka.log.LogManagerTest testTimeBasedFlush PASSED kafka.log.LogManagerTest testLeastLoadedAssignment PASSED kafka.log.LogManagerTest testTwoLogManagersUsingSameDirFails PASSED kafka.log.LogManagerTest testCheckpointRecoveryPoints PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithTrailingSlash PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithRelativeDirectory PASSED kafka.log.LogConfigTest testFromPropsDefaults PASSED kafka.log.LogConfigTest testFromPropsEmpty PASSED kafka.log.LogConfigTest testFromPropsToProps PASSED kafka.log.LogConfigTest testFromPropsInvalid PASSED kafka.log.OffsetIndexTest truncate PASSED kafka.log.OffsetIndexTest randomLookupTest PASSED kafka.log.OffsetIndexTest lookupExtremeCases PASSED kafka.log.OffsetIndexTest appendTooMany PASSED kafka.log.OffsetIndexTest appendOutOfOrder PASSED kafka.log.OffsetIndexTest testReopen PASSED kafka.log.FileMessageSetTest testWrittenEqualsRead PASSED kafka.log.FileMessageSetTest testIteratorIsConsistent PASSED kafka.log.FileMessageSetTest testSizeInBytes PASSED kafka.log.FileMessageSetTest testWriteTo PASSED kafka.log.FileMessageSetTest testFileSize PASSED kafka.log.FileMessageSetTest testIterationOverPartialAndTruncation PASSED kafka.log.FileMessageSetTest
[jira] [Commented] (KAFKA-1944) Rename LogCleaner and related classes to LogCompactor
[ https://issues.apache.org/jira/browse/KAFKA-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316222#comment-14316222 ] Joel Koshy commented on KAFKA-1944: --- Sure - I think this can wait until that is in. Rename LogCleaner and related classes to LogCompactor - Key: KAFKA-1944 URL: https://issues.apache.org/jira/browse/KAFKA-1944 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Ashish Kumar Singh Labels: newbie Following a mailing list discussion: the name LogCleaner is seriously misleading. Its more of a log compactor. Deleting old logs happens elsewhere from what I've seen. Note that this may require renaming related classes, objects, configs and metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils
[ https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316380#comment-14316380 ] Tong Li commented on KAFKA-1926: [~harsha_ch]Yeah, will do that. Thanks. Replace kafka.utils.Utils with o.a.k.common.utils.Utils --- Key: KAFKA-1926 URL: https://issues.apache.org/jira/browse/KAFKA-1926 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2.0 Reporter: Jay Kreps Labels: newbie, patch Attachments: KAFKA-1926.patch There is currently a lot of duplication between the Utils class in common and the one in core. Our plan has been to deprecate duplicate code in the server and replace it with the new common code. As such we should evaluate each method in the scala Utils and do one of the following: 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose utility in active use that is not Kafka-specific. If we migrate it we should really think about the API and make sure there is some test coverage. A few things in there are kind of funky and we shouldn't just blindly copy them over. 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold any utilities that really need to make use of Scala features to be convenient. 3. Delete it if it is not used, or has a bad api. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils
[ https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316319#comment-14316319 ] Sriharsha Chintalapani commented on KAFKA-1926: --- [~tongli] you should make patch against trunk. Replace kafka.utils.Utils with o.a.k.common.utils.Utils --- Key: KAFKA-1926 URL: https://issues.apache.org/jira/browse/KAFKA-1926 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2.0 Reporter: Jay Kreps Labels: newbie, patch Attachments: KAFKA-1926.patch There is currently a lot of duplication between the Utils class in common and the one in core. Our plan has been to deprecate duplicate code in the server and replace it with the new common code. As such we should evaluate each method in the scala Utils and do one of the following: 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose utility in active use that is not Kafka-specific. If we migrate it we should really think about the API and make sure there is some test coverage. A few things in there are kind of funky and we shouldn't just blindly copy them over. 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold any utilities that really need to make use of Scala features to be convenient. 3. Delete it if it is not used, or has a bad api. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1944) Rename LogCleaner and related classes to LogCompactor
[ https://issues.apache.org/jira/browse/KAFKA-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316951#comment-14316951 ] Ashish Kumar Singh commented on KAFKA-1944: --- Sure. Rename LogCleaner and related classes to LogCompactor - Key: KAFKA-1944 URL: https://issues.apache.org/jira/browse/KAFKA-1944 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Ashish Kumar Singh Labels: newbie Following a mailing list discussion: the name LogCleaner is seriously misleading. Its more of a log compactor. Deleting old logs happens elsewhere from what I've seen. Note that this may require renaming related classes, objects, configs and metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [KIP-DISCUSSION] Mirror Maker Enhancement
Cool, I agree with all that. I agree about the need for a rebalancing callback. Totally agree about record handler. It would be great to see if a prototype of this is workable. Thanks guys! -Jay On Wed, Feb 11, 2015 at 12:36 PM, Joel Koshy jjkosh...@gmail.com wrote: Hey Jay, Guozhang, Becket and I got together to discuss this and we think: - It seems that your proposal based on the new consumer and flush call should work. - We would likely need to call the poll with a timeout that matches the offset commit interval in order to deal with low volume mirroring pipelines. - We will still need a rebalance callback to reduce duplicates - the rebalance callback would need to flush and commit offsets. - The only remaining question is if the overall throughput is sufficient. I think someone at LinkedIn (I don't remember who) did some experiments with data channel size == 1 and ran into issues. That was not thoroughly investigated though. - The addition of flush may actually make this solution viable for the current mirror-maker (with the old consumer). We can prototype that offline and if it works out well we can redo KAFKA-1650 (i.e., refactor the current mirror maker). The flush call and the new consumer didn't exist at the time we did KAFKA-1650 so this did not occur to us. - We think the RecordHandler is still a useful small addition for the use-cases mentioned earlier in this thread. Thanks, Joel On Wed, Feb 11, 2015 at 09:05:39AM -0800, Jay Kreps wrote: Guozhang, I agree with 1-3, I do think what I was proposing was simpler but perhaps there are gaps in that? Hey Joel--Here was a sketch of what I was proposing. I do think this get's rid of manual offset tracking, especially doing so across threads with dedicated commit threads, which I think is pretty complex. while(true) { val recs = consumer.poll(Long.MaxValue); for (rec - recs) producer.send(rec, logErrorCallback) if(System.currentTimeMillis - lastCommit commitInterval) { producer.flush() consumer.commit() lastCommit = System.currentTimeMillis } } (See the previous email for details). I think the question is: is there any reason--performance, correctness, etc--that this won't work? Basically I think you guys have thought about this more so I may be missing something. If so let's flag it while we still have leeway on the consumer. If we think that will work, well I do think it is conceptually a lot simpler than the current code, though I suppose one could disagree on that. -Jay On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy jjkosh...@gmail.com wrote: Hi Jay, The data channels are actually a big part of the complexity of the zero data loss design, though, right? Because then you need some reverse channel to flow the acks back to the consumer based on where you are versus just acking what you have read and written (as in the code snippet I put up). I'm not sure if we are on the same page. Even if the data channel was not there the current handling for zero data loss would remain very similar - you would need to maintain lists of unacked source offsets. I'm wondering if the KIP needs more detail on how it is currently implemented; or are suggesting a different approach (in which case I have not fully understood). I'm not sure what you mean by flowing acks back to the consumer - the MM commits offsets after the producer ack has been received. There is some additional complexity introduced in reducing duplicates on a rebalance - this is actually optional (since duplicates are currently a given). The reason that was done anyway is that with the auto-commit turned off duplicates are almost guaranteed on a rebalance. I think the point that Neha and I were trying to make was that the motivation to embed stuff into MM kind of is related to how complex a simple consume and produce with good throughput will be. If it is simple to write such a thing in a few lines, the pain of embedding a bunch of stuff won't be worth it, if it has to be as complex as the current mm then of course we will need all kinds of plug ins because no one will be able to write such a thing. I don't have a huge concern with a simple plug-in but I think if it turns into something more complex with filtering and aggregation or whatever we really need to stop and think a bit about the design. I agree - I don't think there is a use-case for any complex plug-in. It is pretty much what Becket has described currently for the message handler - i.e., take an incoming record and return a list of outgoing records (which could be empty if you filter). So here is my take on the MM: - Bare bones: simple consumer - producer pairs (0.7 style). This is ideal, but does not handle no data loss
Re: [KIP-DISCUSSION] Mirror Maker Enhancement
Hey Jay, Guozhang, Becket and I got together to discuss this and we think: - It seems that your proposal based on the new consumer and flush call should work. - We would likely need to call the poll with a timeout that matches the offset commit interval in order to deal with low volume mirroring pipelines. - We will still need a rebalance callback to reduce duplicates - the rebalance callback would need to flush and commit offsets. - The only remaining question is if the overall throughput is sufficient. I think someone at LinkedIn (I don't remember who) did some experiments with data channel size == 1 and ran into issues. That was not thoroughly investigated though. - The addition of flush may actually make this solution viable for the current mirror-maker (with the old consumer). We can prototype that offline and if it works out well we can redo KAFKA-1650 (i.e., refactor the current mirror maker). The flush call and the new consumer didn't exist at the time we did KAFKA-1650 so this did not occur to us. - We think the RecordHandler is still a useful small addition for the use-cases mentioned earlier in this thread. Thanks, Joel On Wed, Feb 11, 2015 at 09:05:39AM -0800, Jay Kreps wrote: Guozhang, I agree with 1-3, I do think what I was proposing was simpler but perhaps there are gaps in that? Hey Joel--Here was a sketch of what I was proposing. I do think this get's rid of manual offset tracking, especially doing so across threads with dedicated commit threads, which I think is pretty complex. while(true) { val recs = consumer.poll(Long.MaxValue); for (rec - recs) producer.send(rec, logErrorCallback) if(System.currentTimeMillis - lastCommit commitInterval) { producer.flush() consumer.commit() lastCommit = System.currentTimeMillis } } (See the previous email for details). I think the question is: is there any reason--performance, correctness, etc--that this won't work? Basically I think you guys have thought about this more so I may be missing something. If so let's flag it while we still have leeway on the consumer. If we think that will work, well I do think it is conceptually a lot simpler than the current code, though I suppose one could disagree on that. -Jay On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy jjkosh...@gmail.com wrote: Hi Jay, The data channels are actually a big part of the complexity of the zero data loss design, though, right? Because then you need some reverse channel to flow the acks back to the consumer based on where you are versus just acking what you have read and written (as in the code snippet I put up). I'm not sure if we are on the same page. Even if the data channel was not there the current handling for zero data loss would remain very similar - you would need to maintain lists of unacked source offsets. I'm wondering if the KIP needs more detail on how it is currently implemented; or are suggesting a different approach (in which case I have not fully understood). I'm not sure what you mean by flowing acks back to the consumer - the MM commits offsets after the producer ack has been received. There is some additional complexity introduced in reducing duplicates on a rebalance - this is actually optional (since duplicates are currently a given). The reason that was done anyway is that with the auto-commit turned off duplicates are almost guaranteed on a rebalance. I think the point that Neha and I were trying to make was that the motivation to embed stuff into MM kind of is related to how complex a simple consume and produce with good throughput will be. If it is simple to write such a thing in a few lines, the pain of embedding a bunch of stuff won't be worth it, if it has to be as complex as the current mm then of course we will need all kinds of plug ins because no one will be able to write such a thing. I don't have a huge concern with a simple plug-in but I think if it turns into something more complex with filtering and aggregation or whatever we really need to stop and think a bit about the design. I agree - I don't think there is a use-case for any complex plug-in. It is pretty much what Becket has described currently for the message handler - i.e., take an incoming record and return a list of outgoing records (which could be empty if you filter). So here is my take on the MM: - Bare bones: simple consumer - producer pairs (0.7 style). This is ideal, but does not handle no data loss - Above plus support no data loss. This actually adds quite a bit of complexity. - Above plus the message handler. This is a trivial addition I think that makes the MM usable in a few other mirroring-like applications. Joel On Tue, Feb 10, 2015 at 12:31 PM, Joel Koshy jjkosh...@gmail.com wrote: On Tue, Feb 10, 2015 at 12:13:46PM -0800,
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Hey Andrii, To answer your earlier question we just really can't be adding any more scala protocol objects. These things are super hard to maintain because they hand code the byte parsing and don't have good versioning support. Since we are already planning on converting we definitely don't want to add a ton more of these--they are total tech debt. What does it mean that the changes are isolated from the current code base? I actually didn't understand the remaining comments, which of the points are you responding to? Maybe one sticking point here is that it seems like you want to make some kind of tool, and you have made a 1-1 mapping between commands you imagine in the tool and protocol additions. I want to make sure we don't do that. The protocol needs to be really really well thought out against many use cases so it should make perfect logical sense in the absence of knowing the command line tool, right? -Jay On Wed, Feb 11, 2015 at 11:57 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Hey Jay, I would like to continue this discussion as it seem there is no progress here. First of all, could you please explain what did you mean in 2? How exactly are we going to migrate to the new java protocol definitions. And why it's a blocker for centralized CLI? I agree with you, this feature includes lots of stuff, but thankfully almost all changes are isolated from the current code base, so the main thing, I think, we need to agree is RQ/RP format. So how can we start discussion about the concrete messages format? Can we take ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-ProposedRQ/RPFormat ) as starting point? We had some doubts earlier whether it worth introducing one generic Admin Request for all commands (https://issues.apache.org/jira/browse/KAFKA-1694 ) but then everybody agreed it would be better to have separate message for each admin command. The Request part is really dictated from the command (e.g. TopicCommand) arguments itself, so the proposed version should be fine (let's put aside for now remarks about Optional type, batching, configs normalization - I agree with all of them). So the second part is Response. I see there are two cases here. a) Mutate requests - Create/Alter/... ; b) Get requests - List/Describe... a) should only hold request result (regardless what we decide about blocking/non-blocking commands execution). Usually we provide error code in response but since we will use this in interactive shell we need some human readable error description - so I added errorDesription field where you can at least leave exception.getMessage. b) in addition to previous item message should hold command specific response data. We can discuss in detail each of them but let's for now agree about the overall pattern. Thanks, Andrii Biletskyi On Fri, Jan 23, 2015 at 6:59 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, This is great. A few comments on KIP-4 1. This is much needed functionality, but there are a lot of the so let's really think these protocols through. We really want to end up with a set of well thought-out, orthoganol apis. For this reason I think it is really important to think through the end state even if that includes APIs we won't implement in the first phase. 2. Let's please please please wait until we have switched the server over to the new java protocol definitions. If we add upteen more ad hoc scala objects that is just generating more work for the conversion we know we have to do. 3. This proposal introduces a new type of optional parameter. This is inconsistent with everything else in the protocol where we use -1 or some other marker value. You could argue either way but let's stick with that for consistency. For clients that implemented the protocol in a better way than our scala code these basic primitives are hard to change. 4. ClusterMetadata: This seems to duplicate TopicMetadataRequest which has brokers, topics, and partitions. I think we should rename that request ClusterMetadataRequest (or just MetadataRequest) and include the id of the controller. Or are there other things we could add here? 5. We have a tendency to try to make a lot of requests that can only go to particular nodes. This adds a lot of burden for client implementations (it sounds easy but each discovery can fail in many parts so it ends up being a full state machine to do right). I think we should consider making admin commands and ideally as many of the other apis as possible available on all brokers and just redirect to the controller on the broker side. Perhaps there would be a general way to encapsulate this re-routing behavior. 6. We should probably normalize the key value pairs used for configs rather than embedding a new formatting.
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Hey Jay, I would like to continue this discussion as it seem there is no progress here. First of all, could you please explain what did you mean in 2? How exactly are we going to migrate to the new java protocol definitions. And why it's a blocker for centralized CLI? I agree with you, this feature includes lots of stuff, but thankfully almost all changes are isolated from the current code base, so the main thing, I think, we need to agree is RQ/RP format. So how can we start discussion about the concrete messages format? Can we take ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-ProposedRQ/RPFormat) as starting point? We had some doubts earlier whether it worth introducing one generic Admin Request for all commands (https://issues.apache.org/jira/browse/KAFKA-1694) but then everybody agreed it would be better to have separate message for each admin command. The Request part is really dictated from the command (e.g. TopicCommand) arguments itself, so the proposed version should be fine (let's put aside for now remarks about Optional type, batching, configs normalization - I agree with all of them). So the second part is Response. I see there are two cases here. a) Mutate requests - Create/Alter/... ; b) Get requests - List/Describe... a) should only hold request result (regardless what we decide about blocking/non-blocking commands execution). Usually we provide error code in response but since we will use this in interactive shell we need some human readable error description - so I added errorDesription field where you can at least leave exception.getMessage. b) in addition to previous item message should hold command specific response data. We can discuss in detail each of them but let's for now agree about the overall pattern. Thanks, Andrii Biletskyi On Fri, Jan 23, 2015 at 6:59 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, This is great. A few comments on KIP-4 1. This is much needed functionality, but there are a lot of the so let's really think these protocols through. We really want to end up with a set of well thought-out, orthoganol apis. For this reason I think it is really important to think through the end state even if that includes APIs we won't implement in the first phase. 2. Let's please please please wait until we have switched the server over to the new java protocol definitions. If we add upteen more ad hoc scala objects that is just generating more work for the conversion we know we have to do. 3. This proposal introduces a new type of optional parameter. This is inconsistent with everything else in the protocol where we use -1 or some other marker value. You could argue either way but let's stick with that for consistency. For clients that implemented the protocol in a better way than our scala code these basic primitives are hard to change. 4. ClusterMetadata: This seems to duplicate TopicMetadataRequest which has brokers, topics, and partitions. I think we should rename that request ClusterMetadataRequest (or just MetadataRequest) and include the id of the controller. Or are there other things we could add here? 5. We have a tendency to try to make a lot of requests that can only go to particular nodes. This adds a lot of burden for client implementations (it sounds easy but each discovery can fail in many parts so it ends up being a full state machine to do right). I think we should consider making admin commands and ideally as many of the other apis as possible available on all brokers and just redirect to the controller on the broker side. Perhaps there would be a general way to encapsulate this re-routing behavior. 6. We should probably normalize the key value pairs used for configs rather than embedding a new formatting. So two strings rather than one with an internal equals sign. 7. Is the postcondition of these APIs that the command has begun or that the command has been completed? It is a lot more usable if the command has been completed so you know that if you create a topic and then publish to it you won't get an exception about there being no such topic. 8. Describe topic and list topics duplicate a lot of stuff in the metadata request. Is there a reason to give back topics marked for deletion? I feel like if we just make the post-condition of the delete command be that the topic is deleted that will get rid of the need for this right? And it will be much more intuitive. 9. Should we consider batching these requests? We have generally tried to allow multiple operations to be batched. My suspicion is that without this we will get a lot of code that does something like for(topic: adminClient.listTopics()) adminClient.describeTopic(topic) this code will work great when you test on 5 topics but not do as well if you have 50k. 10. I think we should also discuss how we want
Re: Review Request 29468: Patch for KAFKA-1805
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29468/#review72032 --- Can you add unit tests? - Gwen Shapira On Feb. 11, 2015, 10:34 p.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29468/ --- (Updated Feb. 11, 2015, 10:34 p.m.) Review request for kafka. Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42 https://issues.apache.org/jira/browse/KAFKA-1805 https://issues.apache.org/jira/browse/KAFKA-1905 https://issues.apache.org/jira/browse/KAFKA-42 Repository: kafka Description --- Merge remote-tracking branch 'origin/trunk' into KAFKA-1805 Handling the case where al the fields in ProducerRecord can be null. Adding toString back. Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 065d4e6c6a4966ac216e98696782e2714044df29 Diff: https://reviews.apache.org/r/29468/diff/ Testing --- Thanks, Parth Brahmbhatt
Re: Review Request 29468: Patch for KAFKA-1805
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29468/ --- (Updated Feb. 11, 2015, 10:34 p.m.) Review request for kafka. Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42 https://issues.apache.org/jira/browse/KAFKA-1805 https://issues.apache.org/jira/browse/KAFKA-1905 https://issues.apache.org/jira/browse/KAFKA-42 Repository: kafka Description (updated) --- Merge remote-tracking branch 'origin/trunk' into KAFKA-1805 Handling the case where al the fields in ProducerRecord can be null. Adding toString back. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 065d4e6c6a4966ac216e98696782e2714044df29 Diff: https://reviews.apache.org/r/29468/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Commented] (KAFKA-1805) Kafka ProducerRecord should implement equals
[ https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317118#comment-14317118 ] Parth Brahmbhatt commented on KAFKA-1805: - Updated reviewboard https://reviews.apache.org/r/29468/diff/ against branch origin/trunk Kafka ProducerRecord should implement equals Key: KAFKA-1805 URL: https://issues.apache.org/jira/browse/KAFKA-1805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2.0 Reporter: Thomas Omans Assignee: Thomas Omans Priority: Minor Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch I was writing some tests to verify that I am calculating my partitions, topics, keys, and values properly in my producer code and discovered that ProducerRecord does not implement equality. This makes tests integrating kafka particularly awkward. https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java I can whip up a patch since this is essentially just a value object. Thanks, Thomas Omans -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1805) Kafka ProducerRecord should implement equals
[ https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-1805: Attachment: KAFKA-1805_2015-02-11_14:34:28.patch Kafka ProducerRecord should implement equals Key: KAFKA-1805 URL: https://issues.apache.org/jira/browse/KAFKA-1805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2.0 Reporter: Thomas Omans Assignee: Thomas Omans Priority: Minor Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch I was writing some tests to verify that I am calculating my partitions, topics, keys, and values properly in my producer code and discovered that ProducerRecord does not implement equality. This makes tests integrating kafka particularly awkward. https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java I can whip up a patch since this is essentially just a value object. Thanks, Thomas Omans -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1805) Kafka ProducerRecord should implement equals
[ https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317108#comment-14317108 ] Parth Brahmbhatt commented on KAFKA-1805: - Updated reviewboard https://reviews.apache.org/r/29468/diff/ against branch origin/trunk Kafka ProducerRecord should implement equals Key: KAFKA-1805 URL: https://issues.apache.org/jira/browse/KAFKA-1805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2.0 Reporter: Thomas Omans Assignee: Thomas Omans Priority: Minor Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, KAFKA-1805_2015-02-11_14:30:14.patch I was writing some tests to verify that I am calculating my partitions, topics, keys, and values properly in my producer code and discovered that ProducerRecord does not implement equality. This makes tests integrating kafka particularly awkward. https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java I can whip up a patch since this is essentially just a value object. Thanks, Thomas Omans -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29468: Patch for KAFKA-1805
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29468/ --- (Updated Feb. 11, 2015, 10:30 p.m.) Review request for kafka. Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42 https://issues.apache.org/jira/browse/KAFKA-1805 https://issues.apache.org/jira/browse/KAFKA-1905 https://issues.apache.org/jira/browse/KAFKA-42 Repository: kafka Description (updated) --- Merge remote-tracking branch 'origin/trunk' into KAFKA-1805 Handling the case where al the fields in ProducerRecord can be null. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 065d4e6c6a4966ac216e98696782e2714044df29 Diff: https://reviews.apache.org/r/29468/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Updated] (KAFKA-1805) Kafka ProducerRecord should implement equals
[ https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-1805: Attachment: KAFKA-1805_2015-02-11_14:30:14.patch Kafka ProducerRecord should implement equals Key: KAFKA-1805 URL: https://issues.apache.org/jira/browse/KAFKA-1805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2.0 Reporter: Thomas Omans Assignee: Thomas Omans Priority: Minor Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, KAFKA-1805_2015-02-11_14:30:14.patch I was writing some tests to verify that I am calculating my partitions, topics, keys, and values properly in my producer code and discovered that ProducerRecord does not implement equality. This makes tests integrating kafka particularly awkward. https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java I can whip up a patch since this is essentially just a value object. Thanks, Thomas Omans -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29468: Patch for KAFKA-1805
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29468/ --- (Updated Feb. 11, 2015, 10:37 p.m.) Review request for kafka. Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42 https://issues.apache.org/jira/browse/KAFKA-1805 https://issues.apache.org/jira/browse/KAFKA-1905 https://issues.apache.org/jira/browse/KAFKA-42 Repository: kafka Description --- Merge remote-tracking branch 'origin/trunk' into KAFKA-1805 Handling the case where al the fields in ProducerRecord can be null. Adding toString back. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 065d4e6c6a4966ac216e98696782e2714044df29 Diff: https://reviews.apache.org/r/29468/diff/ Testing --- Thanks, Parth Brahmbhatt
Re: Review Request 29468: Patch for KAFKA-1805
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29468/#review72031 --- clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java https://reviews.apache.org/r/29468/#comment117965 not intentional, removed. clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java https://reviews.apache.org/r/29468/#comment117966 handling null for all fields now. clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java https://reviews.apache.org/r/29468/#comment117967 nulls are handled now. - Parth Brahmbhatt On Feb. 11, 2015, 10:53 p.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29468/ --- (Updated Feb. 11, 2015, 10:53 p.m.) Review request for kafka. Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42 https://issues.apache.org/jira/browse/KAFKA-1805 https://issues.apache.org/jira/browse/KAFKA-1905 https://issues.apache.org/jira/browse/KAFKA-42 Repository: kafka Description --- Merge remote-tracking branch 'origin/trunk' into KAFKA-1805 Handling the case where al the fields in ProducerRecord can be null. Adding toString back. Added unit test for eqauls and hashcode. Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 065d4e6c6a4966ac216e98696782e2714044df29 clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java PRE-CREATION Diff: https://reviews.apache.org/r/29468/diff/ Testing --- Unit tests added. Thanks, Parth Brahmbhatt
Re: Review Request 29468: Patch for KAFKA-1805
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29468/ --- (Updated Feb. 11, 2015, 10:53 p.m.) Review request for kafka. Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42 https://issues.apache.org/jira/browse/KAFKA-1805 https://issues.apache.org/jira/browse/KAFKA-1905 https://issues.apache.org/jira/browse/KAFKA-42 Repository: kafka Description --- Merge remote-tracking branch 'origin/trunk' into KAFKA-1805 Handling the case where al the fields in ProducerRecord can be null. Adding toString back. Added unit test for eqauls and hashcode. Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 065d4e6c6a4966ac216e98696782e2714044df29 clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java PRE-CREATION Diff: https://reviews.apache.org/r/29468/diff/ Testing (updated) --- Unit tests added. Thanks, Parth Brahmbhatt
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Jay, Thanks for answering. You understood correctly, most of my comments were related to your point 1) - about well thought-out apis. Also, yes, as I understood we would like to introduce a single unified CLI tool with centralized server-side request handling for lots of existing ones (incl. TopicCommand, CommitOffsetChecker, ReassignPartitions, smth else if added in future). In our previous discussion ( https://issues.apache.org/jira/browse/KAFKA-1694) people said they'd rather have a separate message for each command, so, yes, this way I came to 1-1 mapping between commands in the tool and protocol additions. But I might be wrong. At the end I just try to start discussion how at least generally this protocol should look like. Thanks, Andrii On Wed, Feb 11, 2015 at 11:10 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Andrii, To answer your earlier question we just really can't be adding any more scala protocol objects. These things are super hard to maintain because they hand code the byte parsing and don't have good versioning support. Since we are already planning on converting we definitely don't want to add a ton more of these--they are total tech debt. What does it mean that the changes are isolated from the current code base? I actually didn't understand the remaining comments, which of the points are you responding to? Maybe one sticking point here is that it seems like you want to make some kind of tool, and you have made a 1-1 mapping between commands you imagine in the tool and protocol additions. I want to make sure we don't do that. The protocol needs to be really really well thought out against many use cases so it should make perfect logical sense in the absence of knowing the command line tool, right? -Jay On Wed, Feb 11, 2015 at 11:57 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Hey Jay, I would like to continue this discussion as it seem there is no progress here. First of all, could you please explain what did you mean in 2? How exactly are we going to migrate to the new java protocol definitions. And why it's a blocker for centralized CLI? I agree with you, this feature includes lots of stuff, but thankfully almost all changes are isolated from the current code base, so the main thing, I think, we need to agree is RQ/RP format. So how can we start discussion about the concrete messages format? Can we take ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-ProposedRQ/RPFormat ) as starting point? We had some doubts earlier whether it worth introducing one generic Admin Request for all commands ( https://issues.apache.org/jira/browse/KAFKA-1694 ) but then everybody agreed it would be better to have separate message for each admin command. The Request part is really dictated from the command (e.g. TopicCommand) arguments itself, so the proposed version should be fine (let's put aside for now remarks about Optional type, batching, configs normalization - I agree with all of them). So the second part is Response. I see there are two cases here. a) Mutate requests - Create/Alter/... ; b) Get requests - List/Describe... a) should only hold request result (regardless what we decide about blocking/non-blocking commands execution). Usually we provide error code in response but since we will use this in interactive shell we need some human readable error description - so I added errorDesription field where you can at least leave exception.getMessage. b) in addition to previous item message should hold command specific response data. We can discuss in detail each of them but let's for now agree about the overall pattern. Thanks, Andrii Biletskyi On Fri, Jan 23, 2015 at 6:59 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, This is great. A few comments on KIP-4 1. This is much needed functionality, but there are a lot of the so let's really think these protocols through. We really want to end up with a set of well thought-out, orthoganol apis. For this reason I think it is really important to think through the end state even if that includes APIs we won't implement in the first phase. 2. Let's please please please wait until we have switched the server over to the new java protocol definitions. If we add upteen more ad hoc scala objects that is just generating more work for the conversion we know we have to do. 3. This proposal introduces a new type of optional parameter. This is inconsistent with everything else in the protocol where we use -1 or some other marker value. You could argue either way but let's stick with that for consistency. For clients that implemented the protocol in a better way than our scala code these basic primitives are hard to change. 4.
Re: Review Request 29468: Patch for KAFKA-1805
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29468/ --- (Updated Feb. 11, 2015, 10:49 p.m.) Review request for kafka. Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42 https://issues.apache.org/jira/browse/KAFKA-1805 https://issues.apache.org/jira/browse/KAFKA-1905 https://issues.apache.org/jira/browse/KAFKA-42 Repository: kafka Description (updated) --- Merge remote-tracking branch 'origin/trunk' into KAFKA-1805 Handling the case where al the fields in ProducerRecord can be null. Adding toString back. Added unit test for eqauls and hashcode. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 065d4e6c6a4966ac216e98696782e2714044df29 clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java PRE-CREATION Diff: https://reviews.apache.org/r/29468/diff/ Testing --- Thanks, Parth Brahmbhatt
Re: [DISCUSSION] KAFKA-1697 - make NotEnoughReplicasAfterAppend a non-retriable exception
If people have agreed upon this semantic: quote if you set retries 0 you are saying I accept duplicates but want to ensure my stuff gets written, if you set retries = 0 you are saying I can't abide duplicates and am willing to tolerate loss. So Retryable for us means retry may succeed. \quote then NotEnoughReplicasAfterAppend should be retriable. PS: we can probably make it clearer in the new producer config table? Guozhang On Wed, Feb 11, 2015 at 5:41 AM, Joel Koshy jjkosh...@gmail.com wrote: Thanks for the comments - however, it is not clear to me what your preference is on making NotEnoughReplicasAfterAppend retriable vs non-retriable. As for me, my preference is to leave it as retriable since it is clear that the produce may succeed on a retry (and may introduce a duplicate). I agree that idempotence will bring full closure to this though. Anyone else have a preference on this? Thanks, Joel On Tue, Feb 10, 2015 at 08:23:08PM -0800, Jay Kreps wrote: Yeah there are really two concepts here as I think you noted: 1. Retry safe: we know that the write did not occur 2. Retry fixable: if you send that again it could work (probably there are better names for these). Some things we know did not do a write and may be fixed by retrying (no leader). Some things we know didn't do a write and are not worth retrying (message too large). Somethings we don't know and are worth retrying (network error), and probably some things we don't know and aren't worth it (can't think of one though). (I feel like Donald Rumsfeld with the known unknowns thing). In the current world if you set retries 0 you are saying I accept duplicates but want to ensure my stuff gets written, if you set retries = 0 you are saying I can't abide duplicates and am willing to tolerate loss. So Retryable for us means retry may succeed. Originally I thought of maybe trying to model both concepts. However the two arguments against it are: 1. Even if you do this the guarantee remains at least once delivery because: (1) in the network error case you just don't know, (2) consumer failure. 2. The proper fix for this is to add idempotence support on the server, which we should do. Doing idempotence support on the server will actually fix all duplicate problems, including the network error case (because of course the server knows whether your write went through even though the client doesn't). When we have that then the client can always just retry anything marked Retriable (i.e. retry may work) without fear of duplicates. This gives exactly once delivery to the log, and a co-operating consumer can use the offset to dedupe and get it end-to-end. So that was why I had just left one type of Retriable and used it to mean retry may work and don't try to flag anything for duplicates. -Jay On Tue, Feb 10, 2015 at 4:32 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Devs, Need your thoughts on retriable exceptions: If a user configures Kafka with min.isr 1 and there are not enough replicas to safely store the data, there are two possibilities: 1. The lack of replicas was discovered before the message was written. We throw NotEnoughReplicas. 2. The lack of replicas was discovered after the message was written to leader. In this case, we throw NotEnoughReplicasAfterAppend. Currently, both errors are Retriable. Which means that the new producer will retry multiple times. In case of the second exception, this will cause duplicates. KAFKA-1697 suggests: we probably want to make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. I agreed that the client (the one using the Producer) should weight the problems duplicates will cause vs. the probability of losing the message and do something sensible and made the exception non-retriable. In the RB (https://reviews.apache.org/r/29647/) Joel raised a good point: (Joel, feel free to correct me if I misrepresented your point) I think our interpretation of retriable is as follows (but we can discuss on the list if that needs to change): if the produce request hits an error, and there is absolutely no point in retrying then that is a non-retriable error. MessageSizeTooLarge is an example - since unless the producer changes the request to make the messages smaller there is no point in retrying. ... Duplicates can arise even for other errors (e.g., request timed out). So that side-effect is not compelling enough to warrant a change to make this non-retriable. *(TL;DR; ) Should exceptions where retries can cause duplicates should still be * *retriable?* Gwen -- -- Guozhang
Re: [DISCUSSION] KAFKA-1697 - make NotEnoughReplicasAfterAppend a non-retriable exception
Makes sense to me. Thanks for the very detailed clarification, Jay :) Will leave NotEnoughReplicasAfterAppend as retriable. Gwen On Wed, Feb 11, 2015 at 4:18 PM, Guozhang Wang wangg...@gmail.com wrote: If people have agreed upon this semantic: quote if you set retries 0 you are saying I accept duplicates but want to ensure my stuff gets written, if you set retries = 0 you are saying I can't abide duplicates and am willing to tolerate loss. So Retryable for us means retry may succeed. \quote then NotEnoughReplicasAfterAppend should be retriable. PS: we can probably make it clearer in the new producer config table? Guozhang On Wed, Feb 11, 2015 at 5:41 AM, Joel Koshy jjkosh...@gmail.com wrote: Thanks for the comments - however, it is not clear to me what your preference is on making NotEnoughReplicasAfterAppend retriable vs non-retriable. As for me, my preference is to leave it as retriable since it is clear that the produce may succeed on a retry (and may introduce a duplicate). I agree that idempotence will bring full closure to this though. Anyone else have a preference on this? Thanks, Joel On Tue, Feb 10, 2015 at 08:23:08PM -0800, Jay Kreps wrote: Yeah there are really two concepts here as I think you noted: 1. Retry safe: we know that the write did not occur 2. Retry fixable: if you send that again it could work (probably there are better names for these). Some things we know did not do a write and may be fixed by retrying (no leader). Some things we know didn't do a write and are not worth retrying (message too large). Somethings we don't know and are worth retrying (network error), and probably some things we don't know and aren't worth it (can't think of one though). (I feel like Donald Rumsfeld with the known unknowns thing). In the current world if you set retries 0 you are saying I accept duplicates but want to ensure my stuff gets written, if you set retries = 0 you are saying I can't abide duplicates and am willing to tolerate loss. So Retryable for us means retry may succeed. Originally I thought of maybe trying to model both concepts. However the two arguments against it are: 1. Even if you do this the guarantee remains at least once delivery because: (1) in the network error case you just don't know, (2) consumer failure. 2. The proper fix for this is to add idempotence support on the server, which we should do. Doing idempotence support on the server will actually fix all duplicate problems, including the network error case (because of course the server knows whether your write went through even though the client doesn't). When we have that then the client can always just retry anything marked Retriable (i.e. retry may work) without fear of duplicates. This gives exactly once delivery to the log, and a co-operating consumer can use the offset to dedupe and get it end-to-end. So that was why I had just left one type of Retriable and used it to mean retry may work and don't try to flag anything for duplicates. -Jay On Tue, Feb 10, 2015 at 4:32 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Devs, Need your thoughts on retriable exceptions: If a user configures Kafka with min.isr 1 and there are not enough replicas to safely store the data, there are two possibilities: 1. The lack of replicas was discovered before the message was written. We throw NotEnoughReplicas. 2. The lack of replicas was discovered after the message was written to leader. In this case, we throw NotEnoughReplicasAfterAppend. Currently, both errors are Retriable. Which means that the new producer will retry multiple times. In case of the second exception, this will cause duplicates. KAFKA-1697 suggests: we probably want to make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. I agreed that the client (the one using the Producer) should weight the problems duplicates will cause vs. the probability of losing the message and do something sensible and made the exception non-retriable. In the RB (https://reviews.apache.org/r/29647/) Joel raised a good point: (Joel, feel free to correct me if I misrepresented your point) I think our interpretation of retriable is as follows (but we can discuss on the list if that needs to change): if the produce request hits an error, and there is absolutely no point in retrying then that is a non-retriable error. MessageSizeTooLarge is an example - since unless the producer changes the request to make the messages smaller there is no point in retrying. ... Duplicates can arise even for other errors (e.g., request timed out). So that side-effect is not compelling enough to
Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints
The description that Jun gave for (2) was the detail I was looking for - Gwen can you update the KIP with that for completeness/clarity? I'm +1 as well overall. However, I think it would be good if we also get an ack from someone who is more experienced on the operations side (say, Todd) to review especially the upgrade plan. On Wed, Feb 11, 2015 at 09:40:50AM -0800, Jun Rao wrote: +1 for proposed changes in 1 and 2. 1. The impact is that if someone uses SimpleConsumer and references Broker explicitly, the application needs code change to compile with 0.8.3. Since SimpleConsumer is not widely used, breaking the API in SimpleConsumer but maintaining overall code cleanness seems to be a better tradeoff. 2. For clarification, the issue is the following. In 0.8.3, we will be evolving the wire protocol of UpdateMedataRequest (to send info about endpoints for different security protocols). Since this is used in intra-cluster communication, we need to do the upgrade in two steps. The idea is that in 0.8.3, we will default wire.protocol.version to 0.8.2. When upgrading to 0.8.3, in step 1, we do a rolling upgrade to 0.8.3. After step 1, all brokers will be capable for processing the new protocol in 0.8.3, but without actually using it. In step 2, we configure wire.protocol.version to 0.8.3 in each broker and do another rolling restart. After step 2, all brokers will start using the new protocol in 0.8.3. Let's say that in the next release 0.9, we are changing the intra-cluster wire protocol again. We will do the similar thing: defaulting wire.protocol.version to 0.8.3 in 0.9 so that people can upgrade from 0.8.3 to 0.9 in two steps. For people who want to upgrade from 0.8.2 to 0.9 directly, they will have to configure wire.protocol.version to 0.8.2 first and then do the two-step upgrade to 0.9. Gwen, In KIP2, there is still a reference to use.new.protocol. This needs to be removed. Also, would it be better to use intra.cluster.wire.protocol.version since this only applies to the wire protocol among brokers? Others, The patch in KAFKA-1809 is almost ready. It would be good to wrap up the discussion on KIP2 soon. So, if you haven't looked at this KIP, please take a look and send your comments. Thanks, Jun On Mon, Jan 26, 2015 at 8:02 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Devs, While reviewing the patch for KAFKA-1809, we came across two questions that we are interested in hearing the community out on. 1. This patch changes the Broker class and adds a new class BrokerEndPoint that behaves like the previous broker. While technically kafka.cluster.Broker is not part of the public API, it is returned by javaapi, used with the SimpleConsumer. Getting replicas from PartitionMetadata will now return BrokerEndPoint instead of Broker. All method calls remain the same, but since we return a new type, we break the API. Note that this breakage does not prevent upgrades - existing SimpleConsumers will continue working (because we are wire-compatible). The only thing that won't work is building SimpleConsumers with dependency on Kafka versions higher than 0.8.2. Arguably, we don't want anyone to do it anyway :) So: Do we state that the highest release on which SimpleConsumers can depend is 0.8.2? Or shall we keep Broker as is and create an UberBroker which will contain multiple brokers as its endpoints? 2. The KIP suggests use.new.wire.protocol configuration to decide which protocols the brokers will use to talk to each other. The problem is that after the next upgrade, the wire protocol is no longer new, so we'll have to reset it to false for the following upgrade, then change to true again... and upgrading more than a single version will be impossible. Bad idea :) As an alternative, we can have a property for each version and set one of them to true. Or (simple, I think) have wire.protocol.version property and accept version numbers (0.8.2, 0.8.3, 0.9) as values. Please share your thoughts :) Gwen
Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints
Added Jun's notes to the KIP (Thanks for explaining so clearly, Jun. I was clearly struggling with this...) and removed the reference to use.new.wire.protocol. On Wed, Feb 11, 2015 at 4:19 PM, Joel Koshy jjkosh...@gmail.com wrote: The description that Jun gave for (2) was the detail I was looking for - Gwen can you update the KIP with that for completeness/clarity? I'm +1 as well overall. However, I think it would be good if we also get an ack from someone who is more experienced on the operations side (say, Todd) to review especially the upgrade plan. On Wed, Feb 11, 2015 at 09:40:50AM -0800, Jun Rao wrote: +1 for proposed changes in 1 and 2. 1. The impact is that if someone uses SimpleConsumer and references Broker explicitly, the application needs code change to compile with 0.8.3. Since SimpleConsumer is not widely used, breaking the API in SimpleConsumer but maintaining overall code cleanness seems to be a better tradeoff. 2. For clarification, the issue is the following. In 0.8.3, we will be evolving the wire protocol of UpdateMedataRequest (to send info about endpoints for different security protocols). Since this is used in intra-cluster communication, we need to do the upgrade in two steps. The idea is that in 0.8.3, we will default wire.protocol.version to 0.8.2. When upgrading to 0.8.3, in step 1, we do a rolling upgrade to 0.8.3. After step 1, all brokers will be capable for processing the new protocol in 0.8.3, but without actually using it. In step 2, we configure wire.protocol.version to 0.8.3 in each broker and do another rolling restart. After step 2, all brokers will start using the new protocol in 0.8.3. Let's say that in the next release 0.9, we are changing the intra-cluster wire protocol again. We will do the similar thing: defaulting wire.protocol.version to 0.8.3 in 0.9 so that people can upgrade from 0.8.3 to 0.9 in two steps. For people who want to upgrade from 0.8.2 to 0.9 directly, they will have to configure wire.protocol.version to 0.8.2 first and then do the two-step upgrade to 0.9. Gwen, In KIP2, there is still a reference to use.new.protocol. This needs to be removed. Also, would it be better to use intra.cluster.wire.protocol.version since this only applies to the wire protocol among brokers? Others, The patch in KAFKA-1809 is almost ready. It would be good to wrap up the discussion on KIP2 soon. So, if you haven't looked at this KIP, please take a look and send your comments. Thanks, Jun On Mon, Jan 26, 2015 at 8:02 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Devs, While reviewing the patch for KAFKA-1809, we came across two questions that we are interested in hearing the community out on. 1. This patch changes the Broker class and adds a new class BrokerEndPoint that behaves like the previous broker. While technically kafka.cluster.Broker is not part of the public API, it is returned by javaapi, used with the SimpleConsumer. Getting replicas from PartitionMetadata will now return BrokerEndPoint instead of Broker. All method calls remain the same, but since we return a new type, we break the API. Note that this breakage does not prevent upgrades - existing SimpleConsumers will continue working (because we are wire-compatible). The only thing that won't work is building SimpleConsumers with dependency on Kafka versions higher than 0.8.2. Arguably, we don't want anyone to do it anyway :) So: Do we state that the highest release on which SimpleConsumers can depend is 0.8.2? Or shall we keep Broker as is and create an UberBroker which will contain multiple brokers as its endpoints? 2. The KIP suggests use.new.wire.protocol configuration to decide which protocols the brokers will use to talk to each other. The problem is that after the next upgrade, the wire protocol is no longer new, so we'll have to reset it to false for the following upgrade, then change to true again... and upgrading more than a single version will be impossible. Bad idea :) As an alternative, we can have a property for each version and set one of them to true. Or (simple, I think) have wire.protocol.version property and accept version numbers (0.8.2, 0.8.3, 0.9) as values. Please share your thoughts :) Gwen
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Yeah I totally agree that we don't want to just have one do admin stuff command that has the union of all parameters. What I am saying is that command line tools are one client of the administrative apis, but these will be used in a number of scenarios so they should make logical sense even in the absence of the command line tool. Hence comments like trying to clarify the relationship between ClusterMetadata and TopicMetadata...these kinds of things really need to be thought through. Hope that makes sense. -Jay On Wed, Feb 11, 2015 at 1:41 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jay, Thanks for answering. You understood correctly, most of my comments were related to your point 1) - about well thought-out apis. Also, yes, as I understood we would like to introduce a single unified CLI tool with centralized server-side request handling for lots of existing ones (incl. TopicCommand, CommitOffsetChecker, ReassignPartitions, smth else if added in future). In our previous discussion ( https://issues.apache.org/jira/browse/KAFKA-1694) people said they'd rather have a separate message for each command, so, yes, this way I came to 1-1 mapping between commands in the tool and protocol additions. But I might be wrong. At the end I just try to start discussion how at least generally this protocol should look like. Thanks, Andrii On Wed, Feb 11, 2015 at 11:10 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Andrii, To answer your earlier question we just really can't be adding any more scala protocol objects. These things are super hard to maintain because they hand code the byte parsing and don't have good versioning support. Since we are already planning on converting we definitely don't want to add a ton more of these--they are total tech debt. What does it mean that the changes are isolated from the current code base? I actually didn't understand the remaining comments, which of the points are you responding to? Maybe one sticking point here is that it seems like you want to make some kind of tool, and you have made a 1-1 mapping between commands you imagine in the tool and protocol additions. I want to make sure we don't do that. The protocol needs to be really really well thought out against many use cases so it should make perfect logical sense in the absence of knowing the command line tool, right? -Jay On Wed, Feb 11, 2015 at 11:57 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Hey Jay, I would like to continue this discussion as it seem there is no progress here. First of all, could you please explain what did you mean in 2? How exactly are we going to migrate to the new java protocol definitions. And why it's a blocker for centralized CLI? I agree with you, this feature includes lots of stuff, but thankfully almost all changes are isolated from the current code base, so the main thing, I think, we need to agree is RQ/RP format. So how can we start discussion about the concrete messages format? Can we take ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-ProposedRQ/RPFormat ) as starting point? We had some doubts earlier whether it worth introducing one generic Admin Request for all commands ( https://issues.apache.org/jira/browse/KAFKA-1694 ) but then everybody agreed it would be better to have separate message for each admin command. The Request part is really dictated from the command (e.g. TopicCommand) arguments itself, so the proposed version should be fine (let's put aside for now remarks about Optional type, batching, configs normalization - I agree with all of them). So the second part is Response. I see there are two cases here. a) Mutate requests - Create/Alter/... ; b) Get requests - List/Describe... a) should only hold request result (regardless what we decide about blocking/non-blocking commands execution). Usually we provide error code in response but since we will use this in interactive shell we need some human readable error description - so I added errorDesription field where you can at least leave exception.getMessage. b) in addition to previous item message should hold command specific response data. We can discuss in detail each of them but let's for now agree about the overall pattern. Thanks, Andrii Biletskyi On Fri, Jan 23, 2015 at 6:59 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, This is great. A few comments on KIP-4 1. This is much needed functionality, but there are a lot of the so let's really think these protocols through. We really want to end up with a set of well thought-out, orthoganol apis. For this reason I think it is really important to think through the
Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints
Jun, I'm not sure we should default wire.protocol.version to the previous version. This will make fresh installs a bit weird :) I think we should default to the new version and assume that when I'm upgrading a broker, I'm re-using an existing configuration file. This way, if I'm upgrading 0.8.3.0 to 0.9.0.0, the configuration file already says wire.protocol.version=0.8.3.0 and I need to bump it post upgrade. Fresh install will include 0.9.0.0, so I won't need to bump anything. The only exception is with 0.8.2.0, where I'll need to add wire.protocol.version=0.8.2.0 before upgrading to 0.8.3.0. Does that make sense? Regarding the naming, I agree that this parameter only controls the protocol between brokers (clients control the version of the protocol when they are involved, on a per-message basis). However, inter.broker.wire.protocol.version makes it sound like there may be other types of wire.protocol.version in the future, and I'm pretty sure we want a single parameter for controlling protocol versions from broker side. Not a big deal for me either way. On Wed, Feb 11, 2015 at 9:40 AM, Jun Rao j...@confluent.io wrote: +1 for proposed changes in 1 and 2. 1. The impact is that if someone uses SimpleConsumer and references Broker explicitly, the application needs code change to compile with 0.8.3. Since SimpleConsumer is not widely used, breaking the API in SimpleConsumer but maintaining overall code cleanness seems to be a better tradeoff. 2. For clarification, the issue is the following. In 0.8.3, we will be evolving the wire protocol of UpdateMedataRequest (to send info about endpoints for different security protocols). Since this is used in intra-cluster communication, we need to do the upgrade in two steps. The idea is that in 0.8.3, we will default wire.protocol.version to 0.8.2. When upgrading to 0.8.3, in step 1, we do a rolling upgrade to 0.8.3. After step 1, all brokers will be capable for processing the new protocol in 0.8.3, but without actually using it. In step 2, we configure wire.protocol.version to 0.8.3 in each broker and do another rolling restart. After step 2, all brokers will start using the new protocol in 0.8.3. Let's say that in the next release 0.9, we are changing the intra-cluster wire protocol again. We will do the similar thing: defaulting wire.protocol.version to 0.8.3 in 0.9 so that people can upgrade from 0.8.3 to 0.9 in two steps. For people who want to upgrade from 0.8.2 to 0.9 directly, they will have to configure wire.protocol.version to 0.8.2 first and then do the two-step upgrade to 0.9. Gwen, In KIP2, there is still a reference to use.new.protocol. This needs to be removed. Also, would it be better to use intra.cluster.wire.protocol.version since this only applies to the wire protocol among brokers? Others, The patch in KAFKA-1809 is almost ready. It would be good to wrap up the discussion on KIP2 soon. So, if you haven't looked at this KIP, please take a look and send your comments. Thanks, Jun On Mon, Jan 26, 2015 at 8:02 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Devs, While reviewing the patch for KAFKA-1809, we came across two questions that we are interested in hearing the community out on. 1. This patch changes the Broker class and adds a new class BrokerEndPoint that behaves like the previous broker. While technically kafka.cluster.Broker is not part of the public API, it is returned by javaapi, used with the SimpleConsumer. Getting replicas from PartitionMetadata will now return BrokerEndPoint instead of Broker. All method calls remain the same, but since we return a new type, we break the API. Note that this breakage does not prevent upgrades - existing SimpleConsumers will continue working (because we are wire-compatible). The only thing that won't work is building SimpleConsumers with dependency on Kafka versions higher than 0.8.2. Arguably, we don't want anyone to do it anyway :) So: Do we state that the highest release on which SimpleConsumers can depend is 0.8.2? Or shall we keep Broker as is and create an UberBroker which will contain multiple brokers as its endpoints? 2. The KIP suggests use.new.wire.protocol configuration to decide which protocols the brokers will use to talk to each other. The problem is that after the next upgrade, the wire protocol is no longer new, so we'll have to reset it to false for the following upgrade, then change to true again... and upgrading more than a single version will be impossible. Bad idea :) As an alternative, we can have a property for each version and set one of them to true. Or (simple, I think) have wire.protocol.version property and accept version numbers (0.8.2, 0.8.3, 0.9) as values. Please share your thoughts :) Gwen
[jira] [Updated] (KAFKA-1805) Kafka ProducerRecord should implement equals
[ https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-1805: Attachment: KAFKA-1805_2015-02-11_14:37:41.patch Kafka ProducerRecord should implement equals Key: KAFKA-1805 URL: https://issues.apache.org/jira/browse/KAFKA-1805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2.0 Reporter: Thomas Omans Assignee: Thomas Omans Priority: Minor Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch, KAFKA-1805_2015-02-11_14:37:09.patch, KAFKA-1805_2015-02-11_14:37:41.patch I was writing some tests to verify that I am calculating my partitions, topics, keys, and values properly in my producer code and discovered that ProducerRecord does not implement equality. This makes tests integrating kafka particularly awkward. https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java I can whip up a patch since this is essentially just a value object. Thanks, Thomas Omans -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1805) Kafka ProducerRecord should implement equals
[ https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317124#comment-14317124 ] Parth Brahmbhatt commented on KAFKA-1805: - Updated reviewboard https://reviews.apache.org/r/29468/diff/ against branch origin/trunk Kafka ProducerRecord should implement equals Key: KAFKA-1805 URL: https://issues.apache.org/jira/browse/KAFKA-1805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2.0 Reporter: Thomas Omans Assignee: Thomas Omans Priority: Minor Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch, KAFKA-1805_2015-02-11_14:37:09.patch, KAFKA-1805_2015-02-11_14:37:41.patch I was writing some tests to verify that I am calculating my partitions, topics, keys, and values properly in my producer code and discovered that ProducerRecord does not implement equality. This makes tests integrating kafka particularly awkward. https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java I can whip up a patch since this is essentially just a value object. Thanks, Thomas Omans -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1805) Kafka ProducerRecord should implement equals
[ https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317123#comment-14317123 ] Parth Brahmbhatt commented on KAFKA-1805: - Updated reviewboard https://reviews.apache.org/r/29468/diff/ against branch origin/trunk Kafka ProducerRecord should implement equals Key: KAFKA-1805 URL: https://issues.apache.org/jira/browse/KAFKA-1805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2.0 Reporter: Thomas Omans Assignee: Thomas Omans Priority: Minor Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch, KAFKA-1805_2015-02-11_14:37:09.patch, KAFKA-1805_2015-02-11_14:37:41.patch I was writing some tests to verify that I am calculating my partitions, topics, keys, and values properly in my producer code and discovered that ProducerRecord does not implement equality. This makes tests integrating kafka particularly awkward. https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java I can whip up a patch since this is essentially just a value object. Thanks, Thomas Omans -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29468: Patch for KAFKA-1805
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29468/ --- (Updated Feb. 11, 2015, 10:37 p.m.) Review request for kafka. Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42 https://issues.apache.org/jira/browse/KAFKA-1805 https://issues.apache.org/jira/browse/KAFKA-1905 https://issues.apache.org/jira/browse/KAFKA-42 Repository: kafka Description --- Merge remote-tracking branch 'origin/trunk' into KAFKA-1805 Handling the case where al the fields in ProducerRecord can be null. Adding toString back. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 065d4e6c6a4966ac216e98696782e2714044df29 Diff: https://reviews.apache.org/r/29468/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Updated] (KAFKA-1805) Kafka ProducerRecord should implement equals
[ https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-1805: Attachment: KAFKA-1805_2015-02-11_14:37:09.patch Kafka ProducerRecord should implement equals Key: KAFKA-1805 URL: https://issues.apache.org/jira/browse/KAFKA-1805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2.0 Reporter: Thomas Omans Assignee: Thomas Omans Priority: Minor Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch, KAFKA-1805_2015-02-11_14:37:09.patch, KAFKA-1805_2015-02-11_14:37:41.patch I was writing some tests to verify that I am calculating my partitions, topics, keys, and values properly in my producer code and discovered that ProducerRecord does not implement equality. This makes tests integrating kafka particularly awkward. https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java I can whip up a patch since this is essentially just a value object. Thanks, Thomas Omans -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1805) Kafka ProducerRecord should implement equals
[ https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-1805: Attachment: KAFKA-1805_2015-02-11_14:49:10.patch Kafka ProducerRecord should implement equals Key: KAFKA-1805 URL: https://issues.apache.org/jira/browse/KAFKA-1805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2.0 Reporter: Thomas Omans Assignee: Thomas Omans Priority: Minor Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch, KAFKA-1805_2015-02-11_14:37:09.patch, KAFKA-1805_2015-02-11_14:37:41.patch, KAFKA-1805_2015-02-11_14:49:10.patch I was writing some tests to verify that I am calculating my partitions, topics, keys, and values properly in my producer code and discovered that ProducerRecord does not implement equality. This makes tests integrating kafka particularly awkward. https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java I can whip up a patch since this is essentially just a value object. Thanks, Thomas Omans -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29468: Patch for KAFKA-1805
On Feb. 11, 2015, 10:35 p.m., Gwen Shapira wrote: Can you add unit tests? Added uni tests. - Parth --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29468/#review72032 --- On Feb. 11, 2015, 10:49 p.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29468/ --- (Updated Feb. 11, 2015, 10:49 p.m.) Review request for kafka. Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42 https://issues.apache.org/jira/browse/KAFKA-1805 https://issues.apache.org/jira/browse/KAFKA-1905 https://issues.apache.org/jira/browse/KAFKA-42 Repository: kafka Description --- Merge remote-tracking branch 'origin/trunk' into KAFKA-1805 Handling the case where al the fields in ProducerRecord can be null. Adding toString back. Added unit test for eqauls and hashcode. Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 065d4e6c6a4966ac216e98696782e2714044df29 clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java PRE-CREATION Diff: https://reviews.apache.org/r/29468/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Commented] (KAFKA-1805) Kafka ProducerRecord should implement equals
[ https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317145#comment-14317145 ] Parth Brahmbhatt commented on KAFKA-1805: - Updated reviewboard https://reviews.apache.org/r/29468/diff/ against branch origin/trunk Kafka ProducerRecord should implement equals Key: KAFKA-1805 URL: https://issues.apache.org/jira/browse/KAFKA-1805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2.0 Reporter: Thomas Omans Assignee: Thomas Omans Priority: Minor Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch, KAFKA-1805_2015-02-11_14:37:09.patch, KAFKA-1805_2015-02-11_14:37:41.patch, KAFKA-1805_2015-02-11_14:49:10.patch I was writing some tests to verify that I am calculating my partitions, topics, keys, and values properly in my producer code and discovered that ProducerRecord does not implement equality. This makes tests integrating kafka particularly awkward. https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java I can whip up a patch since this is essentially just a value object. Thanks, Thomas Omans -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.
[ https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317171#comment-14317171 ] Parth Brahmbhatt commented on KAFKA-1660: - I am not aware of any clean way to force shutdown a running java thread, how do you propose to do that in the second approach? Ability to call close() with a timeout on the Java Kafka Producer. --- Key: KAFKA-1660 URL: https://issues.apache.org/jira/browse/KAFKA-1660 Project: Kafka Issue Type: Improvement Components: clients, producer Affects Versions: 0.8.2.0 Reporter: Andrew Stein Assignee: Jun Rao Fix For: 0.8.3 Attachments: KAFKA-1660.patch I would like the ability to call {{close}} with a timeout on the Java Client's KafkaProducer. h6. Workaround Currently, it is possible to ensure that {{close}} will return quickly by first doing a {{future.get(timeout)}} on the last future produced on each partition, but this means that the user has to define the partitions up front at the time of {{send}} and track the returned {{future}}'s -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
Looks good. Thanks for working on this. One note, the Channel implementation from SSL only works on Java7 and up. Since we are still supporting Java 6, I'm working on a lighter wrapper that will be a composite on SocketChannel but will not extend it. Perhaps you'll want to use that. Looking forward to the patch! Gwen On Wed, Feb 11, 2015 at 9:17 AM, Harsha m...@harsha.io wrote: Hi, Here is the initial proposal for sasl/kerberos implementation for kafka https://cwiki.apache.org/confluence/x/YI4WAw and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am currently working on prototype which will add more details to the KIP. Just opening the thread to say the work is in progress. I'll update the thread with a initial prototype patch. Thanks, Harsha
Re: Review Request 29647: Patch for KAFKA-1697
On Feb. 11, 2015, 1:48 p.m., Joel Koshy wrote: clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java, line 23 https://reviews.apache.org/r/29647/diff/3/?file=860600#file860600line23 Pending discussion. Gwen Shapira wrote: changing back to retriable, per discussion in mailing list. I'm leaving this as a separate exception and error code, in case client developers want to do something with the extra information. Actually, since we are here, I'll remove the error code from kafka.common and use the o.a.k.common.errors everywhere. We are transitioning there anyway. - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29647/#review71960 --- On Feb. 11, 2015, 1:06 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29647/ --- (Updated Feb. 11, 2015, 1:06 a.m.) Review request for kafka. Bugs: KAFKA-1697 https://issues.apache.org/jira/browse/KAFKA-1697 Repository: kafka Description --- added early handling of invalid number of acks to handler and a test merging with current trunk moved check for legal requiredAcks to append and fixed the tests accordingly Diffs - clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java a6107b818947d6d6818c85cdffcb2b13f69a55c0 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java a8deac4ce5149129d0a6f44c0526af9d55649a36 core/src/main/scala/kafka/cluster/Partition.scala e6ad8be5e33b6fb31c078ad78f8de709869ddc04 core/src/main/scala/kafka/server/KafkaApis.scala 6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 core/src/main/scala/kafka/server/ReplicaManager.scala fb948b9ab28c516e81dab14dcbe211dcd99842b6 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala faa907131ed0aa94a7eacb78c1ffb576062be87a Diff: https://reviews.apache.org/r/29647/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.
[ https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317335#comment-14317335 ] Jay Kreps commented on KAFKA-1660: -- Yeah I'm not proposing calling thread.stop(), we would stop the thread by sending it a message somehow to stop processing and then it exists without waiting for all messages to be sent. Basically the same way we implement close() without a timeout (which also doesn't call thread.stop). Ability to call close() with a timeout on the Java Kafka Producer. --- Key: KAFKA-1660 URL: https://issues.apache.org/jira/browse/KAFKA-1660 Project: Kafka Issue Type: Improvement Components: clients, producer Affects Versions: 0.8.2.0 Reporter: Andrew Stein Assignee: Jun Rao Fix For: 0.8.3 Attachments: KAFKA-1660.patch I would like the ability to call {{close}} with a timeout on the Java Client's KafkaProducer. h6. Workaround Currently, it is possible to ensure that {{close}} will return quickly by first doing a {{future.get(timeout)}} on the last future produced on each partition, but this means that the user has to define the partitions up front at the time of {{send}} and track the returned {{future}}'s -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KAFKA-1697 - make NotEnoughReplicasAfterAppend a non-retriable exception
Yeah, hey Joel, that was a super long winded way of saying let's leave it at Retriable. I agree there is another concept which is non duplicate producing but when we do the idempotence stuff then all things will have that property so it may be okay just to leave it for now since network errors make it impossible to really do this properly without idempotence. -Jay On Wed, Feb 11, 2015 at 5:41 AM, Joel Koshy jjkosh...@gmail.com wrote: Thanks for the comments - however, it is not clear to me what your preference is on making NotEnoughReplicasAfterAppend retriable vs non-retriable. As for me, my preference is to leave it as retriable since it is clear that the produce may succeed on a retry (and may introduce a duplicate). I agree that idempotence will bring full closure to this though. Anyone else have a preference on this? Thanks, Joel On Tue, Feb 10, 2015 at 08:23:08PM -0800, Jay Kreps wrote: Yeah there are really two concepts here as I think you noted: 1. Retry safe: we know that the write did not occur 2. Retry fixable: if you send that again it could work (probably there are better names for these). Some things we know did not do a write and may be fixed by retrying (no leader). Some things we know didn't do a write and are not worth retrying (message too large). Somethings we don't know and are worth retrying (network error), and probably some things we don't know and aren't worth it (can't think of one though). (I feel like Donald Rumsfeld with the known unknowns thing). In the current world if you set retries 0 you are saying I accept duplicates but want to ensure my stuff gets written, if you set retries = 0 you are saying I can't abide duplicates and am willing to tolerate loss. So Retryable for us means retry may succeed. Originally I thought of maybe trying to model both concepts. However the two arguments against it are: 1. Even if you do this the guarantee remains at least once delivery because: (1) in the network error case you just don't know, (2) consumer failure. 2. The proper fix for this is to add idempotence support on the server, which we should do. Doing idempotence support on the server will actually fix all duplicate problems, including the network error case (because of course the server knows whether your write went through even though the client doesn't). When we have that then the client can always just retry anything marked Retriable (i.e. retry may work) without fear of duplicates. This gives exactly once delivery to the log, and a co-operating consumer can use the offset to dedupe and get it end-to-end. So that was why I had just left one type of Retriable and used it to mean retry may work and don't try to flag anything for duplicates. -Jay On Tue, Feb 10, 2015 at 4:32 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Devs, Need your thoughts on retriable exceptions: If a user configures Kafka with min.isr 1 and there are not enough replicas to safely store the data, there are two possibilities: 1. The lack of replicas was discovered before the message was written. We throw NotEnoughReplicas. 2. The lack of replicas was discovered after the message was written to leader. In this case, we throw NotEnoughReplicasAfterAppend. Currently, both errors are Retriable. Which means that the new producer will retry multiple times. In case of the second exception, this will cause duplicates. KAFKA-1697 suggests: we probably want to make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. I agreed that the client (the one using the Producer) should weight the problems duplicates will cause vs. the probability of losing the message and do something sensible and made the exception non-retriable. In the RB (https://reviews.apache.org/r/29647/) Joel raised a good point: (Joel, feel free to correct me if I misrepresented your point) I think our interpretation of retriable is as follows (but we can discuss on the list if that needs to change): if the produce request hits an error, and there is absolutely no point in retrying then that is a non-retriable error. MessageSizeTooLarge is an example - since unless the producer changes the request to make the messages smaller there is no point in retrying. ... Duplicates can arise even for other errors (e.g., request timed out). So that side-effect is not compelling enough to warrant a change to make this non-retriable. *(TL;DR; ) Should exceptions where retries can cause duplicates should still be * *retriable?* Gwen
Re: Review Request 29647: Patch for KAFKA-1697
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29647/ --- (Updated Feb. 12, 2015, 2:45 a.m.) Review request for kafka. Bugs: KAFKA-1697 https://issues.apache.org/jira/browse/KAFKA-1697 Repository: kafka Description (updated) --- added early handling of invalid number of acks to handler and a test merging with current trunk moved check for legal requiredAcks to append and fixed the tests accordingly changing exception back to retriable Diffs (updated) - clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java a6107b818947d6d6818c85cdffcb2b13f69a55c0 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java a8deac4ce5149129d0a6f44c0526af9d55649a36 core/src/main/scala/kafka/cluster/Partition.scala e6ad8be5e33b6fb31c078ad78f8de709869ddc04 core/src/main/scala/kafka/server/KafkaApis.scala 6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 core/src/main/scala/kafka/server/ReplicaManager.scala fb948b9ab28c516e81dab14dcbe211dcd99842b6 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala a1f72f8c2042ff2a43af503b2e06f84706dad9db core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala faa907131ed0aa94a7eacb78c1ffb576062be87a Diff: https://reviews.apache.org/r/29647/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Updated] (KAFKA-1697) remove code related to ack1 on the broker
[ https://issues.apache.org/jira/browse/KAFKA-1697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1697: Attachment: KAFKA-1697_2015-02-11_18:45:42.patch remove code related to ack1 on the broker -- Key: KAFKA-1697 URL: https://issues.apache.org/jira/browse/KAFKA-1697 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Gwen Shapira Fix For: 0.8.3 Attachments: KAFKA-1697.patch, KAFKA-1697_2015-01-14_15:41:37.patch, KAFKA-1697_2015-02-10_17:06:51.patch, KAFKA-1697_2015-02-11_18:45:42.patch We removed the ack1 support from the producer client in kafka-1555. We can completely remove the code in the broker that supports ack1. Also, we probably want to make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1697) remove code related to ack1 on the broker
[ https://issues.apache.org/jira/browse/KAFKA-1697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317471#comment-14317471 ] Gwen Shapira commented on KAFKA-1697: - Updated reviewboard https://reviews.apache.org/r/29647/diff/ against branch trunk remove code related to ack1 on the broker -- Key: KAFKA-1697 URL: https://issues.apache.org/jira/browse/KAFKA-1697 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Gwen Shapira Fix For: 0.8.3 Attachments: KAFKA-1697.patch, KAFKA-1697_2015-01-14_15:41:37.patch, KAFKA-1697_2015-02-10_17:06:51.patch, KAFKA-1697_2015-02-11_18:45:42.patch We removed the ack1 support from the producer client in kafka-1555. We can completely remove the code in the broker that supports ack1. Also, we probably want to make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1947) can't explicitly set replica-assignment when add partitions
[ https://issues.apache.org/jira/browse/KAFKA-1947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317513#comment-14317513 ] Honghai Chen commented on KAFKA-1947: - After fix the command line, the command still not work, seemly more bugs in it. Try below command bin\kafka.cmd topiccmd --create --topic mvlogs --partition 1 --replication-factor 2 --zookeeper localhost bin\kafka.cmd topiccmd --alter --topic mvlogs --partitions 2 --replica-assignment 2:3 --zookeeper localhost The log say: [2015-02-11 19:18:30,791] INFO zookeeper state changed (SyncConnected) (org.I0It ec.zkclient.ZkClient) WARNING: If partitions are increased for a topic that has a key, the partition l ogic or ordering of the messages will be affected [2015-02-11 19:18:31,427] INFO Add partition list for mvlogs is Map() (kafka.adm in.AdminUtils$) [2015-02-11 19:18:31,470] INFO Topic update {version:1,partitions:{0:[3,1]}} (kafka.admin.AdminUtils$) Adding partitions succeeded! [2015-02-11 19:18:31,479] INFO Terminate ZkClient event thread. (org.I0Itec.zkcl ient.ZkEventThread) Seemly the JSON string is wrong! Will continue check it. can't explicitly set replica-assignment when add partitions --- Key: KAFKA-1947 URL: https://issues.apache.org/jira/browse/KAFKA-1947 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Environment: Windows Reporter: Honghai Chen When create topic, the replicaAssignmentOpt should not appear with partitions. But when add partitions, they should can appear together, from the code below, you can see when alter topic, and has partitions in arguments, it try get replica-assignment https://git1-us-west.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/admin/TopicCommand.scala;h=285c0333ff43543d3e46444c1cd9374bb883bb59;hb=HEAD#l114 The root cause is below code: CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, 305 allTopicLevelOpts -- Set(alterOpt, createOpt) + partitionsOpt + replicationFactorOpt) https://git1-us-west.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/admin/TopicCommand.scala;h=285c0333ff43543d3e46444c1cd9374bb883bb59;hb=HEAD#l304 Related: https://issues.apache.org/jira/browse/KAFKA-1052 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1947) can't explicitly set replica-assignment when add partitions
Honghai Chen created KAFKA-1947: --- Summary: can't explicitly set replica-assignment when add partitions Key: KAFKA-1947 URL: https://issues.apache.org/jira/browse/KAFKA-1947 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Environment: Windows Reporter: Honghai Chen When create topic, the replicaAssignmentOpt should not appear with partitions. But when add partitions, they should can appear together, from the code below, you can see when alter topic, and has partitions in arguments, it try get replica-assignment https://git1-us-west.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/admin/TopicCommand.scala;h=285c0333ff43543d3e46444c1cd9374bb883bb59;hb=HEAD#l114 The root cause is below code: CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, 305 allTopicLevelOpts -- Set(alterOpt, createOpt) + partitionsOpt + replicationFactorOpt) https://git1-us-west.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/admin/TopicCommand.scala;h=285c0333ff43543d3e46444c1cd9374bb883bb59;hb=HEAD#l304 Related: https://issues.apache.org/jira/browse/KAFKA-1052 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1938) [doc] Quick start example should reference appropriate Kafka version
[ https://issues.apache.org/jira/browse/KAFKA-1938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317390#comment-14317390 ] Gwen Shapira commented on KAFKA-1938: - +1 LGTM (non-binding) [doc] Quick start example should reference appropriate Kafka version Key: KAFKA-1938 URL: https://issues.apache.org/jira/browse/KAFKA-1938 Project: Kafka Issue Type: Improvement Components: website Affects Versions: 0.8.2.0 Reporter: Stevo Slavic Priority: Trivial Attachments: remove-081-references.patch Kafka 0.8.2.0 documentation, quick start example on https://kafka.apache.org/documentation.html#quickstart in step 1 links and instructs reader to download Kafka 0.8.1.1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 28769: Patch for KAFKA-1809
On Feb. 11, 2015, 9:45 a.m., Jun Rao wrote: Thanks for the new patch. Some more comments. 1. We should think through whether we need to add security protocol to existing tools like SimleConsumerShell and UpdateOffsetsInZk. 2. There are unused imports. 3. The patch needs rebase. 4. The following unit test fails consistently. kafka.network.SocketServerTest testSocketsCloseOnShutdown FAILED org.scalatest.junit.JUnitTestFailedError: expected exception when writing to closed plain socket at org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101) at kafka.network.SocketServerTest.newAssertionFailedException(SocketServerTest.scala:37) at org.scalatest.Assertions$class.fail(Assertions.scala:711) at kafka.network.SocketServerTest.fail(SocketServerTest.scala:37) at kafka.network.SocketServerTest.testSocketsCloseOnShutdown(SocketServerTest.scala:162) 1. Since the tools go major rewrites now (SimpleConsumer may go away, ConsumerOffsetChecker went away...) and since some tools doesn't really need to support security protocols at all (I think? I definitely can't see why then need to support both SSL and Kerberos), I'd rather modify the tools as a separate patch once we have security implemented. The larger part of the change will be implementing SSL/Kerberos in the tools anyway... 3. I know :( chasing commits between reviews is the biggest pain here... 4. Will check it out. On Feb. 11, 2015, 9:45 a.m., Jun Rao wrote: core/src/main/scala/kafka/client/ClientUtils.scala, line 111 https://reviews.apache.org/r/28769/diff/17/?file=846148#file846148line111 Should we default protocolType to PLAINTEXT? I prefer not to default. Makes it more challenging to catch cases where the wrong protocol is used accidentaly. Do you see a case for defaulting here? On Feb. 11, 2015, 9:45 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/KafkaConfig.scala, lines 148-149 https://reviews.apache.org/r/28769/diff/17/?file=846168#file846168line148 Do we use PLAINTEXT://0.0.0.0:9092 to bind to all interface or PLAINTEXT://:9092? If it's the former, does it work for ipv6 and do we need to change getListeners() accordingly? 0.0.0.0 binds to all interfaces. Missing hostname binds to default interface. Its pretty standard (i.e. matches the NIO bind behavior, and the one for lower level sockets too) and works the same way as it did before this patch. On Feb. 11, 2015, 9:45 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/KafkaConfig.scala, line 186 https://reviews.apache.org/r/28769/diff/17/?file=846168#file846168line186 This should default to 0.8.2.0. The user will upgrade the broker jar with the default setting first, followed by another rolling bounce after changing the config to 0.8.3.0. Actually hardcoding the version means having to remember to change this with every release. I'll try to figure something more reasonable for this. - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28769/#review71916 --- On Feb. 3, 2015, 6:52 p.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28769/ --- (Updated Feb. 3, 2015, 6:52 p.m.) Review request for kafka. Bugs: KAFKA-1809 https://issues.apache.org/jira/browse/KAFKA-1809 Repository: kafka Description --- changed topicmetadata to include brokerendpoints and fixed few unit tests fixing systest and support for binding to default address fixed system tests fix default address binding and ipv6 support fix some issues regarding endpoint parsing. Also, larger segments for systest make the validation much faster added link to security wiki in doc fixing unit test after rename of ProtocolType to SecurityProtocol Following Joe's advice, added security protocol enum on client side, and modified protocol to use ID instead of string. validate producer config against enum add a second protocol for testing and modify SocketServerTests to check on multi-ports Reverted the metadata request changes and removed the explicit security protocol argument. Instead the socketserver will determine the protocol based on the port and add this to the request bump version for UpdateMetadataRequest and added support for rolling upgrades with new config following tests - fixed LeaderAndISR protocol and ZK registration for backward compatibility cleaned up some changes that were actually not necessary. hopefully making this patch slightly easier to review undoing
Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints
Thanks, Gwen. This looks good to me as far as the wire protocol versioning goes. I agree with you on defaulting to the new wire protocol version for new installs. I think it will also need to be very clear (to general installer of Kafka, and not just developers) in documentation when the wire protocol version changes moving forwards, and what the risk/benefit of changing to the new version is. Since a rolling upgrade of the intra-cluster protocol is supported, will a rolling downgrade work as well? Should a flaw (bug, security, or otherwise) be discovered after upgrade, is it possible to change the wire.protocol.version back to 0.8.2 and do a rolling bounce? On the host/port/protocol specification, specifically the ZK config format, is it possible to have an un-advertised endpoint? I would see this as potentially useful if you wanted to have an endpoint that you are reserving for intra-cluster communication, and you would prefer to not have it advertised at all. Perhaps it is blocked by a firewall rule or other authentication method. This could also allow you to duplicate a security protocol type but segregate it on a different port or interface (if it is unadvertised, there is no ambiguity to the clients as to which endpoint should be selected). I believe I asked about that previously, and I didn't track what the final outcome was or even if it was discussed further. -Todd On Wed, Feb 11, 2015 at 4:38 PM, Gwen Shapira gshap...@cloudera.com wrote: Added Jun's notes to the KIP (Thanks for explaining so clearly, Jun. I was clearly struggling with this...) and removed the reference to use.new.wire.protocol. On Wed, Feb 11, 2015 at 4:19 PM, Joel Koshy jjkosh...@gmail.com wrote: The description that Jun gave for (2) was the detail I was looking for - Gwen can you update the KIP with that for completeness/clarity? I'm +1 as well overall. However, I think it would be good if we also get an ack from someone who is more experienced on the operations side (say, Todd) to review especially the upgrade plan. On Wed, Feb 11, 2015 at 09:40:50AM -0800, Jun Rao wrote: +1 for proposed changes in 1 and 2. 1. The impact is that if someone uses SimpleConsumer and references Broker explicitly, the application needs code change to compile with 0.8.3. Since SimpleConsumer is not widely used, breaking the API in SimpleConsumer but maintaining overall code cleanness seems to be a better tradeoff. 2. For clarification, the issue is the following. In 0.8.3, we will be evolving the wire protocol of UpdateMedataRequest (to send info about endpoints for different security protocols). Since this is used in intra-cluster communication, we need to do the upgrade in two steps. The idea is that in 0.8.3, we will default wire.protocol.version to 0.8.2. When upgrading to 0.8.3, in step 1, we do a rolling upgrade to 0.8.3. After step 1, all brokers will be capable for processing the new protocol in 0.8.3, but without actually using it. In step 2, we configure wire.protocol.version to 0.8.3 in each broker and do another rolling restart. After step 2, all brokers will start using the new protocol in 0.8.3. Let's say that in the next release 0.9, we are changing the intra-cluster wire protocol again. We will do the similar thing: defaulting wire.protocol.version to 0.8.3 in 0.9 so that people can upgrade from 0.8.3 to 0.9 in two steps. For people who want to upgrade from 0.8.2 to 0.9 directly, they will have to configure wire.protocol.version to 0.8.2 first and then do the two-step upgrade to 0.9. Gwen, In KIP2, there is still a reference to use.new.protocol. This needs to be removed. Also, would it be better to use intra.cluster.wire.protocol.version since this only applies to the wire protocol among brokers? Others, The patch in KAFKA-1809 is almost ready. It would be good to wrap up the discussion on KIP2 soon. So, if you haven't looked at this KIP, please take a look and send your comments. Thanks, Jun On Mon, Jan 26, 2015 at 8:02 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Devs, While reviewing the patch for KAFKA-1809, we came across two questions that we are interested in hearing the community out on. 1. This patch changes the Broker class and adds a new class BrokerEndPoint that behaves like the previous broker. While technically kafka.cluster.Broker is not part of the public API, it is returned by javaapi, used with the SimpleConsumer. Getting replicas from PartitionMetadata will now return BrokerEndPoint instead of Broker. All method calls remain the same, but since we return a new type, we break the API. Note that this breakage does not prevent upgrades - existing SimpleConsumers will continue working (because we are wire-compatible). The only thing that won't work is
Re: [KIP-DISCUSSION] Mirror Maker Enhancement
Thanks for the explanation, Joel! Would love to see the results of the throughput experiment and I'm a +1 on everything else, including the rebalance callback and record handler. -Neha On Wed, Feb 11, 2015 at 1:13 PM, Jay Kreps jay.kr...@gmail.com wrote: Cool, I agree with all that. I agree about the need for a rebalancing callback. Totally agree about record handler. It would be great to see if a prototype of this is workable. Thanks guys! -Jay On Wed, Feb 11, 2015 at 12:36 PM, Joel Koshy jjkosh...@gmail.com wrote: Hey Jay, Guozhang, Becket and I got together to discuss this and we think: - It seems that your proposal based on the new consumer and flush call should work. - We would likely need to call the poll with a timeout that matches the offset commit interval in order to deal with low volume mirroring pipelines. - We will still need a rebalance callback to reduce duplicates - the rebalance callback would need to flush and commit offsets. - The only remaining question is if the overall throughput is sufficient. I think someone at LinkedIn (I don't remember who) did some experiments with data channel size == 1 and ran into issues. That was not thoroughly investigated though. - The addition of flush may actually make this solution viable for the current mirror-maker (with the old consumer). We can prototype that offline and if it works out well we can redo KAFKA-1650 (i.e., refactor the current mirror maker). The flush call and the new consumer didn't exist at the time we did KAFKA-1650 so this did not occur to us. - We think the RecordHandler is still a useful small addition for the use-cases mentioned earlier in this thread. Thanks, Joel On Wed, Feb 11, 2015 at 09:05:39AM -0800, Jay Kreps wrote: Guozhang, I agree with 1-3, I do think what I was proposing was simpler but perhaps there are gaps in that? Hey Joel--Here was a sketch of what I was proposing. I do think this get's rid of manual offset tracking, especially doing so across threads with dedicated commit threads, which I think is pretty complex. while(true) { val recs = consumer.poll(Long.MaxValue); for (rec - recs) producer.send(rec, logErrorCallback) if(System.currentTimeMillis - lastCommit commitInterval) { producer.flush() consumer.commit() lastCommit = System.currentTimeMillis } } (See the previous email for details). I think the question is: is there any reason--performance, correctness, etc--that this won't work? Basically I think you guys have thought about this more so I may be missing something. If so let's flag it while we still have leeway on the consumer. If we think that will work, well I do think it is conceptually a lot simpler than the current code, though I suppose one could disagree on that. -Jay On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy jjkosh...@gmail.com wrote: Hi Jay, The data channels are actually a big part of the complexity of the zero data loss design, though, right? Because then you need some reverse channel to flow the acks back to the consumer based on where you are versus just acking what you have read and written (as in the code snippet I put up). I'm not sure if we are on the same page. Even if the data channel was not there the current handling for zero data loss would remain very similar - you would need to maintain lists of unacked source offsets. I'm wondering if the KIP needs more detail on how it is currently implemented; or are suggesting a different approach (in which case I have not fully understood). I'm not sure what you mean by flowing acks back to the consumer - the MM commits offsets after the producer ack has been received. There is some additional complexity introduced in reducing duplicates on a rebalance - this is actually optional (since duplicates are currently a given). The reason that was done anyway is that with the auto-commit turned off duplicates are almost guaranteed on a rebalance. I think the point that Neha and I were trying to make was that the motivation to embed stuff into MM kind of is related to how complex a simple consume and produce with good throughput will be. If it is simple to write such a thing in a few lines, the pain of embedding a bunch of stuff won't be worth it, if it has to be as complex as the current mm then of course we will need all kinds of plug ins because no one will be able to write such a thing. I don't have a huge concern with a simple plug-in but I think if it turns into something more complex with filtering and aggregation or whatever we really need to stop and think a bit about the design. I agree - I don't think there is a
[jira] [Updated] (KAFKA-1697) remove code related to ack1 on the broker
[ https://issues.apache.org/jira/browse/KAFKA-1697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1697: Attachment: KAFKA-1697_2015-02-11_18:47:53.patch remove code related to ack1 on the broker -- Key: KAFKA-1697 URL: https://issues.apache.org/jira/browse/KAFKA-1697 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Gwen Shapira Fix For: 0.8.3 Attachments: KAFKA-1697.patch, KAFKA-1697_2015-01-14_15:41:37.patch, KAFKA-1697_2015-02-10_17:06:51.patch, KAFKA-1697_2015-02-11_18:45:42.patch, KAFKA-1697_2015-02-11_18:47:53.patch We removed the ack1 support from the producer client in kafka-1555. We can completely remove the code in the broker that supports ack1. Also, we probably want to make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29647: Patch for KAFKA-1697
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29647/ --- (Updated Feb. 12, 2015, 2:47 a.m.) Review request for kafka. Bugs: KAFKA-1697 https://issues.apache.org/jira/browse/KAFKA-1697 Repository: kafka Description (updated) --- added early handling of invalid number of acks to handler and a test merging with current trunk moved check for legal requiredAcks to append and fixed the tests accordingly changing exception back to retriable cleaning unused exceptions Diffs (updated) - clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java a6107b818947d6d6818c85cdffcb2b13f69a55c0 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java a8deac4ce5149129d0a6f44c0526af9d55649a36 core/src/main/scala/kafka/cluster/Partition.scala e6ad8be5e33b6fb31c078ad78f8de709869ddc04 core/src/main/scala/kafka/server/KafkaApis.scala 6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 core/src/main/scala/kafka/server/ReplicaManager.scala fb948b9ab28c516e81dab14dcbe211dcd99842b6 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala a1f72f8c2042ff2a43af503b2e06f84706dad9db core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala faa907131ed0aa94a7eacb78c1ffb576062be87a Diff: https://reviews.apache.org/r/29647/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-1697) remove code related to ack1 on the broker
[ https://issues.apache.org/jira/browse/KAFKA-1697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317475#comment-14317475 ] Gwen Shapira commented on KAFKA-1697: - Updated reviewboard https://reviews.apache.org/r/29647/diff/ against branch trunk remove code related to ack1 on the broker -- Key: KAFKA-1697 URL: https://issues.apache.org/jira/browse/KAFKA-1697 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Gwen Shapira Fix For: 0.8.3 Attachments: KAFKA-1697.patch, KAFKA-1697_2015-01-14_15:41:37.patch, KAFKA-1697_2015-02-10_17:06:51.patch, KAFKA-1697_2015-02-11_18:45:42.patch, KAFKA-1697_2015-02-11_18:47:53.patch We removed the ack1 support from the producer client in kafka-1555. We can completely remove the code in the broker that supports ack1. Also, we probably want to make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29647: Patch for KAFKA-1697
On Feb. 11, 2015, 1:48 p.m., Joel Koshy wrote: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala, line 96 https://reviews.apache.org/r/29647/diff/3/?file=860605#file860605line96 Do we need this here? IMO, its a good idea to have this in any test that starts new threads - to verify that we close them. We have this in a bunch of places. On Feb. 11, 2015, 1:48 p.m., Joel Koshy wrote: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala, line 83 https://reviews.apache.org/r/29647/diff/3/?file=860605#file860605line83 Can we just do the assert here instead of throwing an exception? i.e., `assertEquals(responseStatus.values.size, responseStatus.values.count(_.error == INVALID_REQUIRED_ACKS))` Either way is fine. Whichever is clearer although the above may be more concise if it works. good idea. On Feb. 11, 2015, 1:48 p.m., Joel Koshy wrote: clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java, line 23 https://reviews.apache.org/r/29647/diff/3/?file=860600#file860600line23 Pending discussion. changing back to retriable, per discussion in mailing list. I'm leaving this as a separate exception and error code, in case client developers want to do something with the extra information. - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29647/#review71960 --- On Feb. 11, 2015, 1:06 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29647/ --- (Updated Feb. 11, 2015, 1:06 a.m.) Review request for kafka. Bugs: KAFKA-1697 https://issues.apache.org/jira/browse/KAFKA-1697 Repository: kafka Description --- added early handling of invalid number of acks to handler and a test merging with current trunk moved check for legal requiredAcks to append and fixed the tests accordingly Diffs - clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java a6107b818947d6d6818c85cdffcb2b13f69a55c0 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java a8deac4ce5149129d0a6f44c0526af9d55649a36 core/src/main/scala/kafka/cluster/Partition.scala e6ad8be5e33b6fb31c078ad78f8de709869ddc04 core/src/main/scala/kafka/server/KafkaApis.scala 6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 core/src/main/scala/kafka/server/ReplicaManager.scala fb948b9ab28c516e81dab14dcbe211dcd99842b6 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala faa907131ed0aa94a7eacb78c1ffb576062be87a Diff: https://reviews.apache.org/r/29647/diff/ Testing --- Thanks, Gwen Shapira
Re: [DISCUSSION] KAFKA-1697 - make NotEnoughReplicasAfterAppend a non-retriable exception
As for me, my preference is to leave it as retriable since it is clear that the produce may succeed on a retry (and may introduce a duplicate). I agree that idempotence will bring full closure to this though. +1 On Wed, Feb 11, 2015 at 5:24 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah, hey Joel, that was a super long winded way of saying let's leave it at Retriable. I agree there is another concept which is non duplicate producing but when we do the idempotence stuff then all things will have that property so it may be okay just to leave it for now since network errors make it impossible to really do this properly without idempotence. -Jay On Wed, Feb 11, 2015 at 5:41 AM, Joel Koshy jjkosh...@gmail.com wrote: Thanks for the comments - however, it is not clear to me what your preference is on making NotEnoughReplicasAfterAppend retriable vs non-retriable. As for me, my preference is to leave it as retriable since it is clear that the produce may succeed on a retry (and may introduce a duplicate). I agree that idempotence will bring full closure to this though. Anyone else have a preference on this? Thanks, Joel On Tue, Feb 10, 2015 at 08:23:08PM -0800, Jay Kreps wrote: Yeah there are really two concepts here as I think you noted: 1. Retry safe: we know that the write did not occur 2. Retry fixable: if you send that again it could work (probably there are better names for these). Some things we know did not do a write and may be fixed by retrying (no leader). Some things we know didn't do a write and are not worth retrying (message too large). Somethings we don't know and are worth retrying (network error), and probably some things we don't know and aren't worth it (can't think of one though). (I feel like Donald Rumsfeld with the known unknowns thing). In the current world if you set retries 0 you are saying I accept duplicates but want to ensure my stuff gets written, if you set retries = 0 you are saying I can't abide duplicates and am willing to tolerate loss. So Retryable for us means retry may succeed. Originally I thought of maybe trying to model both concepts. However the two arguments against it are: 1. Even if you do this the guarantee remains at least once delivery because: (1) in the network error case you just don't know, (2) consumer failure. 2. The proper fix for this is to add idempotence support on the server, which we should do. Doing idempotence support on the server will actually fix all duplicate problems, including the network error case (because of course the server knows whether your write went through even though the client doesn't). When we have that then the client can always just retry anything marked Retriable (i.e. retry may work) without fear of duplicates. This gives exactly once delivery to the log, and a co-operating consumer can use the offset to dedupe and get it end-to-end. So that was why I had just left one type of Retriable and used it to mean retry may work and don't try to flag anything for duplicates. -Jay On Tue, Feb 10, 2015 at 4:32 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Devs, Need your thoughts on retriable exceptions: If a user configures Kafka with min.isr 1 and there are not enough replicas to safely store the data, there are two possibilities: 1. The lack of replicas was discovered before the message was written. We throw NotEnoughReplicas. 2. The lack of replicas was discovered after the message was written to leader. In this case, we throw NotEnoughReplicasAfterAppend. Currently, both errors are Retriable. Which means that the new producer will retry multiple times. In case of the second exception, this will cause duplicates. KAFKA-1697 suggests: we probably want to make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. I agreed that the client (the one using the Producer) should weight the problems duplicates will cause vs. the probability of losing the message and do something sensible and made the exception non-retriable. In the RB (https://reviews.apache.org/r/29647/) Joel raised a good point: (Joel, feel free to correct me if I misrepresented your point) I think our interpretation of retriable is as follows (but we can discuss on the list if that needs to change): if the produce request hits an error, and there is absolutely no point in retrying then that is a non-retriable error. MessageSizeTooLarge is an example - since unless the producer changes the request to make the messages smaller there is no point in retrying. ... Duplicates can arise even for other errors (e.g., request timed out). So that side-effect is not
[jira] [Updated] (KAFKA-1938) [doc] Quick start example should reference appropriate Kafka version
[ https://issues.apache.org/jira/browse/KAFKA-1938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1938: Assignee: Manikumar Reddy [doc] Quick start example should reference appropriate Kafka version Key: KAFKA-1938 URL: https://issues.apache.org/jira/browse/KAFKA-1938 Project: Kafka Issue Type: Improvement Components: website Affects Versions: 0.8.2.0 Reporter: Stevo Slavic Assignee: Manikumar Reddy Priority: Trivial Attachments: remove-081-references.patch Kafka 0.8.2.0 documentation, quick start example on https://kafka.apache.org/documentation.html#quickstart in step 1 links and instructs reader to download Kafka 0.8.1.1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29647: Patch for KAFKA-1697
On Feb. 11, 2015, 1:48 p.m., Joel Koshy wrote: clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java, line 23 https://reviews.apache.org/r/29647/diff/3/?file=860600#file860600line23 Pending discussion. Gwen Shapira wrote: changing back to retriable, per discussion in mailing list. I'm leaving this as a separate exception and error code, in case client developers want to do something with the extra information. Gwen Shapira wrote: Actually, since we are here, I'll remove the error code from kafka.common and use the o.a.k.common.errors everywhere. We are transitioning there anyway. Never mind on the last comment. I have unit tests for NotEnoughReplicas that use the scala producer and that requires the kafka.common error. I'll let Jeff figure out how to disentangle the Scala Producer from the existing scala error codes. Its way beyond scope here :) - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29647/#review71960 --- On Feb. 11, 2015, 1:06 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29647/ --- (Updated Feb. 11, 2015, 1:06 a.m.) Review request for kafka. Bugs: KAFKA-1697 https://issues.apache.org/jira/browse/KAFKA-1697 Repository: kafka Description --- added early handling of invalid number of acks to handler and a test merging with current trunk moved check for legal requiredAcks to append and fixed the tests accordingly Diffs - clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java a6107b818947d6d6818c85cdffcb2b13f69a55c0 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java a8deac4ce5149129d0a6f44c0526af9d55649a36 core/src/main/scala/kafka/cluster/Partition.scala e6ad8be5e33b6fb31c078ad78f8de709869ddc04 core/src/main/scala/kafka/server/KafkaApis.scala 6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 core/src/main/scala/kafka/server/ReplicaManager.scala fb948b9ab28c516e81dab14dcbe211dcd99842b6 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala faa907131ed0aa94a7eacb78c1ffb576062be87a Diff: https://reviews.apache.org/r/29647/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-1948) kafka.api.consumerTests are hanging
[ https://issues.apache.org/jira/browse/KAFKA-1948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317722#comment-14317722 ] Guozhang Wang commented on KAFKA-1948: -- Gwen, Just ran locally a few times and I do see this issue coming, will take a look at this asap. kafka.api.consumerTests are hanging --- Key: KAFKA-1948 URL: https://issues.apache.org/jira/browse/KAFKA-1948 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Noticed today that very often when I run the full test suite, it hangs on kafka.api.consumerTest (not always same test though). It doesn't reproduce 100% of the time, but enough to be very annoying. I also saw it happening on trunk after KAFKA-1333: https://builds.apache.org/view/All/job/Kafka-trunk/389/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1948) kafka.api.consumerTests are hanging
[ https://issues.apache.org/jira/browse/KAFKA-1948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317713#comment-14317713 ] Gwen Shapira commented on KAFKA-1948: - [~guozhang] [~onurkaraman] - since you worked on KAFKA-1333, can you check if the hangs are related? kafka.api.consumerTests are hanging --- Key: KAFKA-1948 URL: https://issues.apache.org/jira/browse/KAFKA-1948 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Noticed today that very often when I run the full test suite, it hangs on kafka.api.consumerTest (not always same test though). It doesn't reproduce 100% of the time, but enough to be very annoying. I also saw it happening on trunk after KAFKA-1333: https://builds.apache.org/view/All/job/Kafka-trunk/389/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1948) kafka.api.consumerTests are hanging
Gwen Shapira created KAFKA-1948: --- Summary: kafka.api.consumerTests are hanging Key: KAFKA-1948 URL: https://issues.apache.org/jira/browse/KAFKA-1948 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Noticed today that very often when I run the full test suite, it hangs on kafka.api.consumerTest (not always same test though). It doesn't reproduce 100% of the time, but enough to be very annoying. I also saw it happening on trunk after KAFKA-1333: https://builds.apache.org/view/All/job/Kafka-trunk/389/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1887) controller error message on shutting the last broker
[ https://issues.apache.org/jira/browse/KAFKA-1887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317695#comment-14317695 ] Gwen Shapira commented on KAFKA-1887: - [~harsha_ch] I think the idea was simply to move kafkaHealthCheck.shutdown() to after kafkaController.shutdown() in the broker shutdown sequence. This happens after the ControlShutdownRequest, so it should be ok, right? controller error message on shutting the last broker Key: KAFKA-1887 URL: https://issues.apache.org/jira/browse/KAFKA-1887 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao Assignee: Sriharsha Chintalapani Priority: Minor Fix For: 0.8.3 We always see the following error in state-change log on shutting down the last broker. [2015-01-20 13:21:04,036] ERROR Controller 0 epoch 3 initiated state change for partition [test,0] from OfflinePartition to OnlinePartition failed (state.change.logger) kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)] at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:75) at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:357) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:206) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117) at kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:446) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:373) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Build failed in Jenkins: KafkaPreCommit #3
See https://builds.apache.org/job/KafkaPreCommit/3/ -- [...truncated 901 lines...] kafka.log.LogManagerTest testCheckpointRecoveryPoints PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithTrailingSlash PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithRelativeDirectory PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[0] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[1] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[2] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[3] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[4] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[5] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[6] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[7] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[8] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[9] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[10] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[11] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[12] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[13] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[14] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[15] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[16] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[17] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[18] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[19] PASSED kafka.log.CleanerTest testCleanSegments PASSED kafka.log.CleanerTest testCleaningWithDeletes PASSED kafka.log.CleanerTest testCleanSegmentsWithAbort PASSED kafka.log.CleanerTest testSegmentGrouping PASSED kafka.log.CleanerTest testBuildOffsetMap PASSED kafka.log.OffsetMapTest testBasicValidation PASSED kafka.log.OffsetMapTest testClear PASSED kafka.log.FileMessageSetTest testWrittenEqualsRead PASSED kafka.log.FileMessageSetTest testIteratorIsConsistent PASSED kafka.log.FileMessageSetTest testSizeInBytes PASSED kafka.log.FileMessageSetTest testWriteTo PASSED kafka.log.FileMessageSetTest testFileSize PASSED kafka.log.FileMessageSetTest testIterationOverPartialAndTruncation PASSED kafka.log.FileMessageSetTest testIterationDoesntChangePosition PASSED kafka.log.FileMessageSetTest testRead PASSED kafka.log.FileMessageSetTest testSearch PASSED kafka.log.FileMessageSetTest testIteratorWithLimits PASSED kafka.log.FileMessageSetTest testTruncate PASSED kafka.log.LogCleanerIntegrationTest cleanerTest PASSED kafka.log.LogSegmentTest testTruncate PASSED kafka.log.LogSegmentTest testReadOnEmptySegment PASSED kafka.log.LogSegmentTest testReadBeforeFirstOffset PASSED kafka.log.LogSegmentTest testMaxOffset PASSED kafka.log.LogSegmentTest testReadAfterLast PASSED kafka.log.LogSegmentTest testReadFromGap PASSED kafka.log.LogSegmentTest testTruncateFull PASSED kafka.log.LogSegmentTest testNextOffsetCalculation PASSED kafka.log.LogSegmentTest testChangeFileSuffixes PASSED kafka.log.LogSegmentTest testRecoveryFixesCorruptIndex PASSED kafka.log.LogSegmentTest testRecoveryWithCorruptMessage PASSED kafka.producer.SyncProducerTest testReachableServer PASSED kafka.producer.SyncProducerTest testEmptyProduceRequest PASSED kafka.producer.SyncProducerTest testMessageSizeTooLarge PASSED kafka.producer.SyncProducerTest testMessageSizeTooLargeWithAckZero PASSED kafka.producer.SyncProducerTest testProduceCorrectlyReceivesResponse PASSED kafka.producer.SyncProducerTest testProducerCanTimeout PASSED kafka.producer.SyncProducerTest testProduceRequestWithNoResponse PASSED kafka.producer.SyncProducerTest testNotEnoughReplicas PASSED kafka.producer.AsyncProducerTest testProducerQueueSize PASSED kafka.producer.AsyncProducerTest testProduceAfterClosed PASSED kafka.producer.AsyncProducerTest testBatchSize PASSED kafka.producer.AsyncProducerTest testQueueTimeExpired PASSED kafka.producer.AsyncProducerTest testPartitionAndCollateEvents PASSED kafka.producer.AsyncProducerTest testSerializeEvents PASSED kafka.producer.AsyncProducerTest testInvalidPartition PASSED kafka.producer.AsyncProducerTest testNoBroker PASSED kafka.producer.AsyncProducerTest testIncompatibleEncoder PASSED kafka.producer.AsyncProducerTest testRandomPartitioner PASSED kafka.producer.AsyncProducerTest testFailedSendRetryLogic PASSED kafka.producer.AsyncProducerTest testJavaProducer PASSED kafka.producer.AsyncProducerTest testInvalidConfiguration PASSED kafka.producer.ProducerTest testUpdateBrokerPartitionInfo PASSED kafka.producer.ProducerTest testSendToNewTopic PASSED kafka.producer.ProducerTest testSendWithDeadBroker PASSED
Re: Review Request 29647: Patch for KAFKA-1697
On Feb. 12, 2015, 4:20 a.m., Joe Stein wrote: core/src/main/scala/kafka/server/ReplicaManager.scala, line 307 https://reviews.apache.org/r/29647/diff/5/?file=861501#file861501line307 Could we change this to a match case? requiredAcks match { case 0 = {} case 1 = {} case -1 = {} case _ = {} } make it more concise to read through and scalish unfortunately, I couldn't use a switch on requiredAcks. There's logic for whether we need to wait for replication based on both requiredAcks and other factors, and splitting this logic to multiple switches / conditions makes things less readable. I do agree that it was challenging to read, so I did some refactoring. I hope the new version is clearer, even if not very scalish. - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29647/#review72107 --- On Feb. 12, 2015, 7:14 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29647/ --- (Updated Feb. 12, 2015, 7:14 a.m.) Review request for kafka. Bugs: KAFKA-1697 https://issues.apache.org/jira/browse/KAFKA-1697 Repository: kafka Description --- added early handling of invalid number of acks to handler and a test merging with current trunk moved check for legal requiredAcks to append and fixed the tests accordingly changing exception back to retriable cleaning unused exceptions refactored appendToLog for clarity Diffs - clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java a6107b818947d6d6818c85cdffcb2b13f69a55c0 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java a8deac4ce5149129d0a6f44c0526af9d55649a36 core/src/main/scala/kafka/cluster/Partition.scala e6ad8be5e33b6fb31c078ad78f8de709869ddc04 core/src/main/scala/kafka/server/KafkaApis.scala 6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 core/src/main/scala/kafka/server/ReplicaManager.scala fb948b9ab28c516e81dab14dcbe211dcd99842b6 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala a1f72f8c2042ff2a43af503b2e06f84706dad9db core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala faa907131ed0aa94a7eacb78c1ffb576062be87a Diff: https://reviews.apache.org/r/29647/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Updated] (KAFKA-1945) MetaData Response - Broker hostname is wrong
[ https://issues.apache.org/jira/browse/KAFKA-1945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] saravana kumar updated KAFKA-1945: -- Description: I use python-kafka's SimpleConsumer to listen to a topic in kafka broker. Kafka broker is running on a machine with its hostname as BROKER_HOST. Now, SimpleConsumer from another machine requests for topic metadata from the broker BROKER_HOST for a topic TOPIC gets a python tuple (Broker metadata, Topic metadata) Broker metadata comes as, {0: BrokerMetadata(nodeId=0, host='localhost', port=9092)} ideally, host value must be BROKER_HOST(hostname cmd from broker shell tty confirms it) but it comes as localhost... How does the wrong broker metadata for a topic get into kafka system? And obviously, this breaks the system since my consumer tries to connect to 9092 on its localhost. MetaData Response - Broker hostname is wrong Key: KAFKA-1945 URL: https://issues.apache.org/jira/browse/KAFKA-1945 Project: Kafka Issue Type: Bug Components: core Reporter: saravana kumar I use python-kafka's SimpleConsumer to listen to a topic in kafka broker. Kafka broker is running on a machine with its hostname as BROKER_HOST. Now, SimpleConsumer from another machine requests for topic metadata from the broker BROKER_HOST for a topic TOPIC gets a python tuple (Broker metadata, Topic metadata) Broker metadata comes as, {0: BrokerMetadata(nodeId=0, host='localhost', port=9092)} ideally, host value must be BROKER_HOST(hostname cmd from broker shell tty confirms it) but it comes as localhost... How does the wrong broker metadata for a topic get into kafka system? And obviously, this breaks the system since my consumer tries to connect to 9092 on its localhost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Kafka New(Java) Producer Connection reset by peer error and LB
Agree with Jay. It's unfortunate that this gets logged because in this case it's just noise, but this is an exception that can happen both in potentially bad cases (remote peer closed connection forcibly with outstanding unprocessed data) or in normal cases that aren't problematic (TCP connection timeout). I'm pretty sure some load balancers, e.g. HAProxy, disable socket lingering to avoid time-wait (i.e. they send a RST even when they could use a FIN), which helps them avoid socket starvation. I think the generalized bug this is an instance of is that we're relying on timeouts in lower layers, like TCP timeouts, to clean up after us. Ideally anything that might trigger a timeout in a lower layer could, with the correct settings, be caught and cleaned up earlier by Kafka. This means adding timeouts on resources managed by Kafka, such as TCP connections to brokers as KAFKA-1941 suggests. On Tue, Feb 10, 2015 at 1:45 PM, Jay Kreps jay.kr...@gmail.com wrote: I don't think this is a bug. Currently we don't support timing out connections in the clients, which would be a good feature to add. As a result the connection remains until the LB kills it. When that happens you get a message logged that the connection was unexpectedly closed, which I think is what should happen (you can disable the logging in log4j if you don't want it). It would be nice to implement a client-side connection LRU for unused connections. I filed a ticket to track this: https://issues.apache.org/jira/browse/KAFKA-1941 -Jay On Tue, Feb 10, 2015 at 11:33 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Ewen, The root of the problem is leak of TCP connection which idle for while. It is just a log message as you mentioned, but suppose you have 50 or more producer instances created by application and everyone of then will print above log that becomes little concern. We configured producer with bootstrap list as LB:port1 and it is set to TCP port forward to broker:port2. When producer fetches Cluster Metadata and discovers that TCP connection LB:port1 is not part of broker cluster or topology, it should close connection to LB:port1 (In my opinion, this would be expected behavior). As you mentioned, producer behavior is normal and this error is harmless. If you consider this as a bug, please let me know and I will file jira ticket for this. We are on non-release 0.8.2 from trunk. Thanks, Bhavesh Thanks, Bhavesh On Tue, Feb 10, 2015 at 12:29 AM, Ewen Cheslack-Postava e...@confluent.io wrote: Bhavesh, I'm unclear what the impact is here. The line numbers don't match up exactly with trunk or 0.8.2.0, but it looks like this exception is just caught and logged. As far as I can tell the producer would continue to function normally. Does this have any impact on the producer or is the concern just that the exception is being logged? On Mon, Feb 9, 2015 at 11:21 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Kafka Team, Please confirm if you would like to open Jira issue to track this ? Thanks, Bhavesh On Mon, Feb 9, 2015 at 12:39 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kakfa Team, We are getting this connection reset by pears after couple of minute aster start-up of producer due to infrastructure deployment strategies we have adopted from LinkedIn. We have LB hostname and port as seed server, and all producers are getting following exception because of TCP idle connection timeout set on LB (which is 2 minutes and Kafka TCP connection idle is set to 10 minutes). This seems to be minor bug to close TCP connection after discovering that seed server is not part of brokers list immediately. java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at sun.nio.ch.IOUtil.read(IOUtil.java:171) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:242) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:662) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at
Re: Review Request 28769: Patch for KAFKA-1809
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28769/#review71916 --- Thanks for the new patch. Some more comments. 1. We should think through whether we need to add security protocol to existing tools like SimleConsumerShell and UpdateOffsetsInZk. 2. There are unused imports. 3. The patch needs rebase. 4. The following unit test fails consistently. kafka.network.SocketServerTest testSocketsCloseOnShutdown FAILED org.scalatest.junit.JUnitTestFailedError: expected exception when writing to closed plain socket at org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101) at kafka.network.SocketServerTest.newAssertionFailedException(SocketServerTest.scala:37) at org.scalatest.Assertions$class.fail(Assertions.scala:711) at kafka.network.SocketServerTest.fail(SocketServerTest.scala:37) at kafka.network.SocketServerTest.testSocketsCloseOnShutdown(SocketServerTest.scala:162) clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java https://reviews.apache.org/r/28769/#comment117835 Do we need to remove final? core/src/main/scala/kafka/client/ClientUtils.scala https://reviews.apache.org/r/28769/#comment117853 Should we default protocolType to PLAINTEXT? core/src/main/scala/kafka/cluster/Broker.scala https://reviews.apache.org/r/28769/#comment117854 Instead of representing endpoints as a single string, should we represent it as an array of strings, one for each endpoint? core/src/main/scala/kafka/cluster/BrokerEndPoint.scala https://reviews.apache.org/r/28769/#comment117855 For ease of reading, perhaps we can add a comment on what the uri format is? core/src/main/scala/kafka/cluster/BrokerEndPoint.scala https://reviews.apache.org/r/28769/#comment117856 Capitalize the first word of each sentence and end the sentence with . This applies to a few other places too. core/src/main/scala/kafka/consumer/ConsumerConfig.scala https://reviews.apache.org/r/28769/#comment117857 I thought we won't support security in the old client? core/src/main/scala/kafka/server/KafkaConfig.scala https://reviews.apache.org/r/28769/#comment117861 If host.name is not specified, should we generate PLAINTEXT://:6667 instead? When converting that to a BrokerEndPoint, do we get a null for host? It will be useful to add a test for that. core/src/main/scala/kafka/server/KafkaConfig.scala https://reviews.apache.org/r/28769/#comment117863 Same comment as above, does a null string translate into a null in endPoint.host? core/src/main/scala/kafka/server/KafkaConfig.scala https://reviews.apache.org/r/28769/#comment117862 Do we use PLAINTEXT://0.0.0.0:9092 to bind to all interface or PLAINTEXT://:9092? If it's the former, does it work for ipv6 and do we need to change getListeners() accordingly? core/src/main/scala/kafka/server/KafkaConfig.scala https://reviews.apache.org/r/28769/#comment117859 Would it be better to make this a sequence of string (like logDirs)? core/src/main/scala/kafka/server/KafkaConfig.scala https://reviews.apache.org/r/28769/#comment117858 This should default to 0.8.2.0. The user will upgrade the broker jar with the default setting first, followed by another rolling bounce after changing the config to 0.8.3.0. - Jun Rao On Feb. 3, 2015, 6:52 p.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28769/ --- (Updated Feb. 3, 2015, 6:52 p.m.) Review request for kafka. Bugs: KAFKA-1809 https://issues.apache.org/jira/browse/KAFKA-1809 Repository: kafka Description --- changed topicmetadata to include brokerendpoints and fixed few unit tests fixing systest and support for binding to default address fixed system tests fix default address binding and ipv6 support fix some issues regarding endpoint parsing. Also, larger segments for systest make the validation much faster added link to security wiki in doc fixing unit test after rename of ProtocolType to SecurityProtocol Following Joe's advice, added security protocol enum on client side, and modified protocol to use ID instead of string. validate producer config against enum add a second protocol for testing and modify SocketServerTests to check on multi-ports Reverted the metadata request changes and removed the explicit security protocol argument. Instead the socketserver will determine the protocol based on the port and add this to the request bump version for UpdateMetadataRequest and added support for rolling upgrades with new config
[jira] [Resolved] (KAFKA-1377) transient unit test failure in LogOffsetTest
[ https://issues.apache.org/jira/browse/KAFKA-1377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy resolved KAFKA-1377. Resolution: Fixed now i am not getting this exception..so closing the issue. transient unit test failure in LogOffsetTest Key: KAFKA-1377 URL: https://issues.apache.org/jira/browse/KAFKA-1377 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao Assignee: Jun Rao Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1377.patch, KAFKA-1377_2014-04-11_17:42:13.patch, KAFKA-1377_2014-04-11_18:14:45.patch Saw the following transient unit test failure. kafka.server.LogOffsetTest testGetOffsetsBeforeEarliestTime FAILED junit.framework.AssertionFailedError: expected:List(0) but was:Vector() at junit.framework.Assert.fail(Assert.java:47) at junit.framework.Assert.failNotEquals(Assert.java:277) at junit.framework.Assert.assertEquals(Assert.java:64) at junit.framework.Assert.assertEquals(Assert.java:71) at kafka.server.LogOffsetTest.testGetOffsetsBeforeEarliestTime(LogOffsetTest.scala:198) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1945) MetaData Response - Broker hostname is wrong
saravana kumar created KAFKA-1945: - Summary: MetaData Response - Broker hostname is wrong Key: KAFKA-1945 URL: https://issues.apache.org/jira/browse/KAFKA-1945 Project: Kafka Issue Type: Bug Components: core Reporter: saravana kumar -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1945) MetaData Response - Broker hostname is wrong
[ https://issues.apache.org/jira/browse/KAFKA-1945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14315841#comment-14315841 ] Manikumar Reddy commented on KAFKA-1945: You can set host.name , advertised.host.name server config properties to bind a particular host name. https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan'tmyconsumers/producersconnecttothebrokers? Also pl post such kind of questions to mailing list (https://kafka.apache.org/contact.html) MetaData Response - Broker hostname is wrong Key: KAFKA-1945 URL: https://issues.apache.org/jira/browse/KAFKA-1945 Project: Kafka Issue Type: Bug Components: core Reporter: saravana kumar I use python-kafka's SimpleConsumer to listen to a topic in kafka broker. Kafka broker is running on a machine with its hostname as BROKER_HOST. Now, SimpleConsumer from another machine requests for topic metadata from the broker BROKER_HOST for a topic TOPIC gets a python tuple (Broker metadata, Topic metadata) Broker metadata comes as, {0: BrokerMetadata(nodeId=0, host='localhost', port=9092)} ideally, host value must be BROKER_HOST(hostname cmd from broker shell tty confirms it) but it comes as localhost... How does the wrong broker metadata for a topic get into kafka system? And obviously, this breaks the system since my consumer tries to connect to 9092 on its localhost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KAFKA-1697 - make NotEnoughReplicasAfterAppend a non-retriable exception
Thanks for the comments - however, it is not clear to me what your preference is on making NotEnoughReplicasAfterAppend retriable vs non-retriable. As for me, my preference is to leave it as retriable since it is clear that the produce may succeed on a retry (and may introduce a duplicate). I agree that idempotence will bring full closure to this though. Anyone else have a preference on this? Thanks, Joel On Tue, Feb 10, 2015 at 08:23:08PM -0800, Jay Kreps wrote: Yeah there are really two concepts here as I think you noted: 1. Retry safe: we know that the write did not occur 2. Retry fixable: if you send that again it could work (probably there are better names for these). Some things we know did not do a write and may be fixed by retrying (no leader). Some things we know didn't do a write and are not worth retrying (message too large). Somethings we don't know and are worth retrying (network error), and probably some things we don't know and aren't worth it (can't think of one though). (I feel like Donald Rumsfeld with the known unknowns thing). In the current world if you set retries 0 you are saying I accept duplicates but want to ensure my stuff gets written, if you set retries = 0 you are saying I can't abide duplicates and am willing to tolerate loss. So Retryable for us means retry may succeed. Originally I thought of maybe trying to model both concepts. However the two arguments against it are: 1. Even if you do this the guarantee remains at least once delivery because: (1) in the network error case you just don't know, (2) consumer failure. 2. The proper fix for this is to add idempotence support on the server, which we should do. Doing idempotence support on the server will actually fix all duplicate problems, including the network error case (because of course the server knows whether your write went through even though the client doesn't). When we have that then the client can always just retry anything marked Retriable (i.e. retry may work) without fear of duplicates. This gives exactly once delivery to the log, and a co-operating consumer can use the offset to dedupe and get it end-to-end. So that was why I had just left one type of Retriable and used it to mean retry may work and don't try to flag anything for duplicates. -Jay On Tue, Feb 10, 2015 at 4:32 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Devs, Need your thoughts on retriable exceptions: If a user configures Kafka with min.isr 1 and there are not enough replicas to safely store the data, there are two possibilities: 1. The lack of replicas was discovered before the message was written. We throw NotEnoughReplicas. 2. The lack of replicas was discovered after the message was written to leader. In this case, we throw NotEnoughReplicasAfterAppend. Currently, both errors are Retriable. Which means that the new producer will retry multiple times. In case of the second exception, this will cause duplicates. KAFKA-1697 suggests: we probably want to make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. I agreed that the client (the one using the Producer) should weight the problems duplicates will cause vs. the probability of losing the message and do something sensible and made the exception non-retriable. In the RB (https://reviews.apache.org/r/29647/) Joel raised a good point: (Joel, feel free to correct me if I misrepresented your point) I think our interpretation of retriable is as follows (but we can discuss on the list if that needs to change): if the produce request hits an error, and there is absolutely no point in retrying then that is a non-retriable error. MessageSizeTooLarge is an example - since unless the producer changes the request to make the messages smaller there is no point in retrying. ... Duplicates can arise even for other errors (e.g., request timed out). So that side-effect is not compelling enough to warrant a change to make this non-retriable. *(TL;DR; ) Should exceptions where retries can cause duplicates should still be * *retriable?* Gwen
Re: Review Request 29647: Patch for KAFKA-1697
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29647/#review71960 --- clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java https://reviews.apache.org/r/29647/#comment117874 Pending discussion. core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala https://reviews.apache.org/r/29647/#comment117872 Can we just do the assert here instead of throwing an exception? i.e., `assertEquals(responseStatus.values.size, responseStatus.values.count(_.error == INVALID_REQUIRED_ACKS))` Either way is fine. Whichever is clearer although the above may be more concise if it works. core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala https://reviews.apache.org/r/29647/#comment117873 Do we need this here? - Joel Koshy On Feb. 11, 2015, 1:06 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29647/ --- (Updated Feb. 11, 2015, 1:06 a.m.) Review request for kafka. Bugs: KAFKA-1697 https://issues.apache.org/jira/browse/KAFKA-1697 Repository: kafka Description --- added early handling of invalid number of acks to handler and a test merging with current trunk moved check for legal requiredAcks to append and fixed the tests accordingly Diffs - clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java a6107b818947d6d6818c85cdffcb2b13f69a55c0 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java a8deac4ce5149129d0a6f44c0526af9d55649a36 core/src/main/scala/kafka/cluster/Partition.scala e6ad8be5e33b6fb31c078ad78f8de709869ddc04 core/src/main/scala/kafka/server/KafkaApis.scala 6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 core/src/main/scala/kafka/server/ReplicaManager.scala fb948b9ab28c516e81dab14dcbe211dcd99842b6 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala faa907131ed0aa94a7eacb78c1ffb576062be87a Diff: https://reviews.apache.org/r/29647/diff/ Testing --- Thanks, Gwen Shapira