Re: [Java New Producer Kafka Trunk ] Need a State Check API Method
I agree with that statement that if producer is closed and try to send message it will give close. What we have done is wrap the NEW Producer API with Old Producer API. So when I use same code with OLD I do not get this issue. It is only problem with NEW Producer. Regardless of close, state I think it will be good to have API to check the state of producer (at least isClosed() API). If you agree, I can file a Jira Request for STATE check API and let me know which flavor of State Check API you prefer. Thanks, Bhavesh On Mon, Oct 6, 2014 at 9:34 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, This is a sanity check. If you send a message after calling close on the producer you should get this error. It sounds like you have multiple threads sending, and you close the producer in the middle of this, then you get this error. This is expected. Perhaps I am misunderstanding? I think tracking the state (i.e. whether you have called close or not) can be done just as easily in your code, right? -Jay On Sun, Oct 5, 2014 at 7:32 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, *java.lang.* *IllegalStateException: Cannot send after the producer is closed.* The above seems to bug. If the ProducerRecord is in flight within send method is execute and another thread seems to shutdown in the middle of flight will get error. Thanks, Bhavesh On Sun, Oct 5, 2014 at 7:15 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, The use case is that we need to know producer state in background Threads and so we can submit the message. This seems to a bug in trunk code. I have notice that KafkaProducer itself does not have close state and inflight message will encounter following issues. Should I file bug for this issue ? java.lang.IllegalStateException: Cannot send after the producer is closed. at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:136) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:237) . at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) Thanks, Bhavesh On Sun, Oct 5, 2014 at 3:30 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Kafka Dev, I would like to request state check state so I can manage the Life Cycle of Producer better. I you guys agree I will file Jira request. I just give state of producer can be I would like mange or start (create new instance of producer) or restart or close based on state. I just gave example, you may add or remove states. /*** * API TO CHECK STATE OF PRODUCER * @Return STATE.INIT_IN_PROGRESS STATE.INIT_DONE STATE.RUNNING STATE.CLOSE_REQUESTED STATE.CLOSE_IN_PROGRESS STATE.CLOSED */ public State getCurrentState(); Thanks, Bhavesh
[jira] [Updated] (KAFKA-1644) Inherit FetchResponse from RequestOrResponse
[ https://issues.apache.org/jira/browse/KAFKA-1644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1644: --- Attachment: 0003-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch Indeed. Here's updated [patch|^0003-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch]. Inherit FetchResponse from RequestOrResponse Key: KAFKA-1644 URL: https://issues.apache.org/jira/browse/KAFKA-1644 Project: Kafka Issue Type: Improvement Reporter: Anton Karamanov Assignee: Anton Karamanov Attachments: 0001-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch, 0002-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch, 0003-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch Unlike all other Kafka API responses {{FetchResponse}} is not a subclass of RequestOrResponse, which requires handling it as a special case while processing responses. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
No process listening on advertised.port
Hello There, I have the following in my server.properties on a two node kafka test cluster …. port=6667 host.name=f-bcpc-vm3.bcpc.example.com advertised.host.name=f-bcpc-vm3.bcpc.example.com advertised.port=9092 … When I bring up Kafka, there is no process listening on port 9092 but Kafka listening on 6667. As a result the test producer code (bundled with Kafka) is not able to connect to put messages. Any thoughts on why this is happening. /etc/hosts file has the entries for host name and I am using kafka 8.1. Any input on this is much appreciated. Thanks, Biju
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14161857#comment-14161857 ] xueqiang wang commented on KAFKA-1646: -- This issue is caused by the discrete blocks of a segment log file in Windows NTFS system. Unlike Linux, Window doesn’t allocate a large space in the disk when creating a new file, and it just finds free blocks if new data come in and links them. Then after a lot of segment log deleting and creating, log blocks may be spread all over the disk. So if a consumer reads data from the disk, the performance will be down. Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (KAFKA-1663) Controller unable to shutdown after a soft failure
[ https://issues.apache.org/jira/browse/KAFKA-1663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede reopened KAFKA-1663: -- [~sriharsha], while talking to Jun, realized that there may have been a regression introduced by the patch by removing the deleteTopicStateChanged.set(true) from startup(). The purpose of that is to let the delete topic thread resume topic deletion on startup for topics for which deletion was initiated on the previous controller. During the review, I assumed that the controller is signaling the delete topic thread separately after startup, but that is not the case. However, while reading through the code, I think there is a bug in the above case where the controller needs to resume topic deletion on startup. Basically the way for the controller to notify the TopicDeletionManager of resuming the thread is via the callers of resumeTopicDeletionThread(). Each of those caller APIs are protected via the controllerLock in KafkaController. However, awaitTopicDeletionNotification is not. So there is a window when the controller might signal a thread that is not waiting on the same monitor. I think the main problem is with having 2 locks - deleteLock and controllerLock. We might have to revisit that decision and see if we consolidate on a single lock (controllerLock). Since this is a different bug, can you file it and link it back to this issue? Controller unable to shutdown after a soft failure -- Key: KAFKA-1663 URL: https://issues.apache.org/jira/browse/KAFKA-1663 Project: Kafka Issue Type: Bug Reporter: Sriharsha Chintalapani Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1663.patch As part of testing KAFKA-1558 I came across a case where inducing soft failure in the current controller elects a new controller but the old controller doesn't shutdown properly. steps to reproduce 1) 5 broker cluster 2) high number of topics(I tested it with 1000 topics) 3) on the current controller do kill -SIGSTOP pid( broker's process id) 4) wait for bit over zookeeper timeout (server.properties) 5) kill -SIGCONT pid 6) There will be a new controller elected. check old controller's log [2014-09-30 15:59:53,398] INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener) [2014-09-30 15:59:53,400] INFO [delete-topics-thread-1], Shutting down (kafka.controller.TopicDeletionManager$DeleteTopicsThread) If it stops there and the broker logs keeps printing Cached zkVersion [0] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) than the controller shutdown never completes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1663) Controller unable to shutdown after a soft failure
[ https://issues.apache.org/jira/browse/KAFKA-1663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14161942#comment-14161942 ] Sriharsha Chintalapani commented on KAFKA-1663: --- [~nehanarkhede] Both TopicDeletionManager.resumeTopicDeletionThread() and awaitTopicDeletionNoification uses deleteLock and DeleteTopicThread.doWork() waits on awaitTopicDeletionNotification before it tries to acquire controllerLock. so simple fix would be to check if there are any topics in topicsToBeDeleted set and call resumeTopicDeletionThread() from start(). I agree that it is best to consolidate on a single lock. Controller unable to shutdown after a soft failure -- Key: KAFKA-1663 URL: https://issues.apache.org/jira/browse/KAFKA-1663 Project: Kafka Issue Type: Bug Reporter: Sriharsha Chintalapani Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1663.patch As part of testing KAFKA-1558 I came across a case where inducing soft failure in the current controller elects a new controller but the old controller doesn't shutdown properly. steps to reproduce 1) 5 broker cluster 2) high number of topics(I tested it with 1000 topics) 3) on the current controller do kill -SIGSTOP pid( broker's process id) 4) wait for bit over zookeeper timeout (server.properties) 5) kill -SIGCONT pid 6) There will be a new controller elected. check old controller's log [2014-09-30 15:59:53,398] INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener) [2014-09-30 15:59:53,400] INFO [delete-topics-thread-1], Shutting down (kafka.controller.TopicDeletionManager$DeleteTopicsThread) If it stops there and the broker logs keeps printing Cached zkVersion [0] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) than the controller shutdown never completes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1681) Newly elected KafkaController might not start deletion of pending topics
Sriharsha Chintalapani created KAFKA-1681: - Summary: Newly elected KafkaController might not start deletion of pending topics Key: KAFKA-1681 URL: https://issues.apache.org/jira/browse/KAFKA-1681 Project: Kafka Issue Type: Bug Reporter: Sriharsha Chintalapani Priority: Blocker As part of KAFKA-1663 deleteTopicStateChanged.set(true) is removed from start(). This will cause newly elected kafka controller not to process the existing delete topic requests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Java New Producer IO Thread Name
Hi Kafka Dev Team, Since we have multiple instance of producers within one JVM, it would be good idea to name network IO thread name by associate with client.id configuration. kafka-producer-network-thread + client.tId Thanks, Bhavesh
Re: No process listening on advertised.port
KAFKA-1092 (https://issues.apache.org/jira/browse/KAFKA-1092) provided the answer to the query. Thanks all. On Tue, Oct 7, 2014 at 8:26 AM, Biju N bijuatapa...@gmail.com wrote: Hello There, I have the following in my server.properties on a two node kafka test cluster …. port=6667 host.name=f-bcpc-vm3.bcpc.example.com advertised.host.name=f-bcpc-vm3.bcpc.example.com advertised.port=9092 … When I bring up Kafka, there is no process listening on port 9092 but Kafka listening on 6667. As a result the test producer code (bundled with Kafka) is not able to connect to put messages. Any thoughts on why this is happening. /etc/hosts file has the entries for host name and I am using kafka 8.1. Any input on this is much appreciated. Thanks, Biju
[jira] [Updated] (KAFKA-1681) Newly elected KafkaController might not start deletion of pending topics
[ https://issues.apache.org/jira/browse/KAFKA-1681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1681: - Fix Version/s: 0.8.2 Newly elected KafkaController might not start deletion of pending topics Key: KAFKA-1681 URL: https://issues.apache.org/jira/browse/KAFKA-1681 Project: Kafka Issue Type: Bug Reporter: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2 As part of KAFKA-1663 deleteTopicStateChanged.set(true) is removed from start(). This will cause newly elected kafka controller not to process the existing delete topic requests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14162106#comment-14162106 ] Jay Kreps commented on KAFKA-1646: -- Ah, you are saying Windows does a worse job of preallocation? Can you do some benchmark the performance improvement? Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Java New Producer Kafka Trunk ] Need a State Check API Method
Hey Bhavesh, But isn't the problem here that you are trying to send messages after closing the producer? I think what I am saying is that since calling close is something the user initiates we don't need an api for this--you can keep track of this yourself, right? -Jay On Mon, Oct 6, 2014 at 11:11 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: I agree with that statement that if producer is closed and try to send message it will give close. What we have done is wrap the NEW Producer API with Old Producer API. So when I use same code with OLD I do not get this issue. It is only problem with NEW Producer. Regardless of close, state I think it will be good to have API to check the state of producer (at least isClosed() API). If you agree, I can file a Jira Request for STATE check API and let me know which flavor of State Check API you prefer. Thanks, Bhavesh On Mon, Oct 6, 2014 at 9:34 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, This is a sanity check. If you send a message after calling close on the producer you should get this error. It sounds like you have multiple threads sending, and you close the producer in the middle of this, then you get this error. This is expected. Perhaps I am misunderstanding? I think tracking the state (i.e. whether you have called close or not) can be done just as easily in your code, right? -Jay On Sun, Oct 5, 2014 at 7:32 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, *java.lang.* *IllegalStateException: Cannot send after the producer is closed.* The above seems to bug. If the ProducerRecord is in flight within send method is execute and another thread seems to shutdown in the middle of flight will get error. Thanks, Bhavesh On Sun, Oct 5, 2014 at 7:15 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, The use case is that we need to know producer state in background Threads and so we can submit the message. This seems to a bug in trunk code. I have notice that KafkaProducer itself does not have close state and inflight message will encounter following issues. Should I file bug for this issue ? java.lang.IllegalStateException: Cannot send after the producer is closed. at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:136) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:237) . at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) Thanks, Bhavesh On Sun, Oct 5, 2014 at 3:30 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Kafka Dev, I would like to request state check state so I can manage the Life Cycle of Producer better. I you guys agree I will file Jira request. I just give state of producer can be I would like mange or start (create new instance of producer) or restart or close based on state. I just gave example, you may add or remove states. /*** * API TO CHECK STATE OF PRODUCER * @Return STATE.INIT_IN_PROGRESS STATE.INIT_DONE STATE.RUNNING STATE.CLOSE_REQUESTED STATE.CLOSE_IN_PROGRESS STATE.CLOSED */ public State getCurrentState(); Thanks, Bhavesh
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review55612 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment95974 Do we need to add = here? core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/25995/#comment95975 We should keep the changes of KAFKA-1647 in its only RB and do not merge them here. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment95978 Could we add some introduction comment here on: 1. The architecture of the MM: producer / consumer thread, data channel per producer thread, offset commit thread, and how different modules interact with each other. 2. Why we need a separate offset commit thread, and how it works. 3. The startup / shutdown process, like which modules to start / shutdown first (this could be moved to the head of the corresponding functions also). core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment95979 Embedded consumer config for consuming from the source cluster. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment95980 Embedded producer config for producing to the target cluster. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment95981 The offset commit thread periodically commit consumed offsets to the source cluster. With the new producer, the offsets are updated upon the returned future metadata of the send() call; with the old producer, the offsets are updated upon the consumer's iterator advances. By doing this, it is guaranteed no data loss even when mirror maker is uncleanly shutdown with the new producer, while with the old producer messages inside the data channel could be lost upon mirror maker unclean shutdown. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96019 numMessageCapacity and byteCapacity? numGetters and numPutters (since the producer is the consumer of this buffer and vice versa)? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96021 How about MirrorMaker-DataChannel-queue%d-NumMessages and MirrorMaker-DataChannel-queue%d-Bytes? and variable name channelNumMessageHists and channelByteHists? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96020 Can we define put(record, queueId) and put(record), and the latter includes the logic of determining the queueId and then call the former? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96022 comment on why we use the hashCode of source topic / partition here. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96026 Instead of letting the consumer to check on the global shutdown flag, could we just add a shutdown function which sets it own flag like the producer thread and the commit thread? Then the process of the shutdown becomes consumers.shutdown consumers.awaitShutdown producers.shutdown producers.awaitShutdown committer.shutdown committer.awaitShutdown connector.shutdown core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96023 Maybe just // if it exits accidentally, stop the entire mirror maker as we did below? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96024 // if it exits accidentally, stop the entire mirror maker core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96025 // the committed offset will be the first offset of the un-consumed message, hence we need to increment by one. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala https://reviews.apache.org/r/25995/#comment96027 queueNumItemCapacity and queueByteCapacity? - Guozhang Wang On Oct. 6, 2014, 5:20 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Oct. 6, 2014, 5:20 p.m.) Review request for kafka. Bugs: KAFKA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Talked with Joel and decided to remove multi connector support as people can always creat multiple mirror maker instances if they want to consumer from multiple clusters. Diffs -
[jira] [Comment Edited] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14162106#comment-14162106 ] Jay Kreps edited comment on KAFKA-1646 at 10/7/14 6:23 PM: --- Ah, you are saying Windows does a worse job of preallocation? How much does this help? Did you do any benchmarking on the performance improvement? was (Author: jkreps): Ah, you are saying Windows does a worse job of preallocation? Can you do some benchmark the performance improvement? Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1499) Broker-side compression configuration
[ https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14162336#comment-14162336 ] Joel Koshy commented on KAFKA-1499: --- Talked to Jay offline and here is a summary of what we discussed: the main motivations for this feature (currently) are: # Log compaction # Ensuring messages on the broker are compressed if a bunch of producers happen to send messages uncompressed - say if all producers in an organization happened to pick up a bad config over time # Ensuring messages on the broker are compressed with a specific compression type - perhaps if downstream consumers want only that compression type For the first use-case, we can potentially get around it as described above by picking any compression type - i.e., if we are writing out a batch of messages that contains various compression types we can just pick one of those types. This is not as neat as having an explicit target compression type but it seems reasonable. The second and third use-cases are likely only marginally useful. So we have a couple of options: * Do nothing - given that we have (what seems to be) a reasonable approach for dealing log compaction. i.e., we can table this and revisit if we have a very compelling use case for it. * Add the compression.type config as a server config and topic-override config. ** In order to address the concern of forgetting to set this (or misconfiguration) there are two approaches: *** Make it an optional configuration as mentioned further above *** Have it default to compression.type producer - which means use whatever compression type the producer used. Broker-side compression configuration - Key: KAFKA-1499 URL: https://issues.apache.org/jira/browse/KAFKA-1499 Project: Kafka Issue Type: New Feature Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.2 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, KAFKA-1499_2014-09-25_11:05:57.patch Original Estimate: 72h Remaining Estimate: 72h A given topic can have messages in mixed compression codecs. i.e., it can also have a mix of uncompressed/compressed messages. It will be useful to support a broker-side configuration to recompress messages to a specific compression codec. i.e., all messages (for all topics) on the broker will be compressed to this codec. We could have per-topic overrides as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26390: Fix KAFKA-1641
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26390/#review55696 --- core/src/main/scala/kafka/log/LogCleanerManager.scala https://reviews.apache.org/r/26390/#comment96074 Can we switch to case(topicPartition , log)? core/src/main/scala/kafka/log/LogCleanerManager.scala https://reviews.apache.org/r/26390/#comment96073 firstDirtyOffset = logStartOffset Personally, I prefer this style for these kinds of statement sequences since all the logic for setting this val is clearly contained, but again that's just a preference - I'm okay with the vars also. ``` val firstDirtyOffset = { val offset = lastClean.getOrElse(l._1, logStartOffset) if (offset logStartOffset) { error(...) logStartOffset } else offset } ``` core/src/main/scala/kafka/log/LogCleanerManager.scala https://reviews.apache.org/r/26390/#comment96075 It seems this should be `LogToClean(l._1, l._2, firstDirtyOffset)` - Joel Koshy On Oct. 6, 2014, 10:21 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26390/ --- (Updated Oct. 6, 2014, 10:21 p.m.) Review request for kafka. Bugs: KAFKA-1641 https://issues.apache.org/jira/browse/KAFKA-1641 Repository: kafka Description --- Reset cleaning start offset upon abnormal log truncation Diffs - core/src/main/scala/kafka/log/LogCleanerManager.scala e8ced6a5922508ea3274905be7c3d6e728f320ac Diff: https://reviews.apache.org/r/26390/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Updated] (KAFKA-1670) Corrupt log files for segment.bytes values close to Int.MaxInt
[ https://issues.apache.org/jira/browse/KAFKA-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani updated KAFKA-1670: -- Attachment: KAFKA-1670_2014-10-07_13:39:13.patch Corrupt log files for segment.bytes values close to Int.MaxInt -- Key: KAFKA-1670 URL: https://issues.apache.org/jira/browse/KAFKA-1670 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1670.patch, KAFKA-1670_2014-10-04_20:17:46.patch, KAFKA-1670_2014-10-06_09:48:25.patch, KAFKA-1670_2014-10-07_13:39:13.patch The maximum value for the topic-level config {{segment.bytes}} is {{Int.MaxInt}} (2147483647). *Using this value causes brokers to corrupt their log files, leaving them unreadable.* We set {{segment.bytes}} to {{2122317824}} which is well below the maximum. One by one, the ISR of all partitions shrunk to 1. Brokers would crash when restarted, attempting to read from a negative offset in a log file. After discovering that many segment files had grown to 4GB or more, we were forced to shut down our *entire production Kafka cluster* for several hours while we split all segment files into 1GB chunks. Looking into the {{kafka.log}} code, the {{segment.bytes}} parameter is used inconsistently. It is treated as a *soft* maximum for the size of the segment file (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/LogConfig.scala#L26) with logs rolled only after (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/Log.scala#L246) they exceed this value. However, much of the code that deals with log files uses *ints* to store the size of the file and the position in the file. Overflow of these ints leads the broker to append to the segments indefinitely, and to fail to read these segments for consuming or recovery. This is trivial to reproduce: {code} $ bin/kafka-topics.sh --topic segment-bytes-test --create --replication-factor 2 --partitions 1 --zookeeper zkhost:2181 $ bin/kafka-topics.sh --topic segment-bytes-test --alter --config segment.bytes=2147483647 --zookeeper zkhost:2181 $ yes Int.MaxValue is a ridiculous bound on file size in 2014 | bin/kafka-console-producer.sh --broker-list localhost:6667 zkhost:2181 --topic segment-bytes-test {code} After running for a few minutes, the log file is corrupt: {code} $ ls -lh data/segment-bytes-test-0/ total 9.7G -rw-r--r-- 1 root root 10M Oct 3 19:39 .index -rw-r--r-- 1 root root 9.7G Oct 3 19:39 .log {code} We recovered the data from the log files using a simple Python script: https://gist.github.com/also/9f823d9eb9dc0a410796 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26346: Patch for KAFKA-1670
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/ --- (Updated Oct. 7, 2014, 8:39 p.m.) Review request for kafka. Bugs: KAFKA-1670 https://issues.apache.org/jira/browse/KAFKA-1670 Repository: kafka Description (updated) --- KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. Merge remote-tracking branch 'origin/trunk' into KAFKA-1670 Diffs (updated) - core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala PRE-CREATION core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/test/scala/unit/kafka/log/LogManagerTest.scala 59bd8a981b3fb8595dd6e790a30071092978a88d core/src/test/scala/unit/kafka/log/LogTest.scala 577d102fc2eb6bb1a72326141ecd431db6d66f04 core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9556ed92c61ffee5423be962bcdbe64c71e1f2fa Diff: https://reviews.apache.org/r/26346/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1670) Corrupt log files for segment.bytes values close to Int.MaxInt
[ https://issues.apache.org/jira/browse/KAFKA-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14162486#comment-14162486 ] Sriharsha Chintalapani commented on KAFKA-1670: --- Updated reviewboard https://reviews.apache.org/r/26346/diff/ against branch origin/trunk Corrupt log files for segment.bytes values close to Int.MaxInt -- Key: KAFKA-1670 URL: https://issues.apache.org/jira/browse/KAFKA-1670 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1670.patch, KAFKA-1670_2014-10-04_20:17:46.patch, KAFKA-1670_2014-10-06_09:48:25.patch, KAFKA-1670_2014-10-07_13:39:13.patch The maximum value for the topic-level config {{segment.bytes}} is {{Int.MaxInt}} (2147483647). *Using this value causes brokers to corrupt their log files, leaving them unreadable.* We set {{segment.bytes}} to {{2122317824}} which is well below the maximum. One by one, the ISR of all partitions shrunk to 1. Brokers would crash when restarted, attempting to read from a negative offset in a log file. After discovering that many segment files had grown to 4GB or more, we were forced to shut down our *entire production Kafka cluster* for several hours while we split all segment files into 1GB chunks. Looking into the {{kafka.log}} code, the {{segment.bytes}} parameter is used inconsistently. It is treated as a *soft* maximum for the size of the segment file (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/LogConfig.scala#L26) with logs rolled only after (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/Log.scala#L246) they exceed this value. However, much of the code that deals with log files uses *ints* to store the size of the file and the position in the file. Overflow of these ints leads the broker to append to the segments indefinitely, and to fail to read these segments for consuming or recovery. This is trivial to reproduce: {code} $ bin/kafka-topics.sh --topic segment-bytes-test --create --replication-factor 2 --partitions 1 --zookeeper zkhost:2181 $ bin/kafka-topics.sh --topic segment-bytes-test --alter --config segment.bytes=2147483647 --zookeeper zkhost:2181 $ yes Int.MaxValue is a ridiculous bound on file size in 2014 | bin/kafka-console-producer.sh --broker-list localhost:6667 zkhost:2181 --topic segment-bytes-test {code} After running for a few minutes, the log file is corrupt: {code} $ ls -lh data/segment-bytes-test-0/ total 9.7G -rw-r--r-- 1 root root 10M Oct 3 19:39 .index -rw-r--r-- 1 root root 9.7G Oct 3 19:39 .log {code} We recovered the data from the log files using a simple Python script: https://gist.github.com/also/9f823d9eb9dc0a410796 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26346: Patch for KAFKA-1670
On Oct. 7, 2014, 12:42 a.m., Jay Kreps wrote: core/src/main/scala/kafka/log/Log.scala, line 502 https://reviews.apache.org/r/26346/diff/3/?file=714142#file714142line502 It is a bit subtle that you are checking for overflow this way. What we mean to check is just that there is sufficient room in the segment for this message, which I think we can do by checking: segment.size config.segmentSize - messagesSize Thanks Jay and Jun for the review and suggesstions. Please check the latest patch. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/#review55619 --- On Oct. 7, 2014, 8:39 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/ --- (Updated Oct. 7, 2014, 8:39 p.m.) Review request for kafka. Bugs: KAFKA-1670 https://issues.apache.org/jira/browse/KAFKA-1670 Repository: kafka Description --- KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. Merge remote-tracking branch 'origin/trunk' into KAFKA-1670 Diffs - core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala PRE-CREATION core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/test/scala/unit/kafka/log/LogManagerTest.scala 59bd8a981b3fb8595dd6e790a30071092978a88d core/src/test/scala/unit/kafka/log/LogTest.scala 577d102fc2eb6bb1a72326141ecd431db6d66f04 core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9556ed92c61ffee5423be962bcdbe64c71e1f2fa Diff: https://reviews.apache.org/r/26346/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Review Request 26346: Patch for KAFKA-1670
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/ --- (Updated Oct. 7, 2014, 8:49 p.m.) Review request for kafka. Bugs: KAFKA-1670 https://issues.apache.org/jira/browse/KAFKA-1670 Repository: kafka Description (updated) --- KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. Diffs (updated) - core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala PRE-CREATION core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/test/scala/unit/kafka/log/LogManagerTest.scala 59bd8a981b3fb8595dd6e790a30071092978a88d core/src/test/scala/unit/kafka/log/LogTest.scala 577d102fc2eb6bb1a72326141ecd431db6d66f04 core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9556ed92c61ffee5423be962bcdbe64c71e1f2fa Diff: https://reviews.apache.org/r/26346/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1670) Corrupt log files for segment.bytes values close to Int.MaxInt
[ https://issues.apache.org/jira/browse/KAFKA-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14162507#comment-14162507 ] Sriharsha Chintalapani commented on KAFKA-1670: --- Updated reviewboard https://reviews.apache.org/r/26346/diff/ against branch origin/trunk Corrupt log files for segment.bytes values close to Int.MaxInt -- Key: KAFKA-1670 URL: https://issues.apache.org/jira/browse/KAFKA-1670 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1670.patch, KAFKA-1670_2014-10-04_20:17:46.patch, KAFKA-1670_2014-10-06_09:48:25.patch, KAFKA-1670_2014-10-07_13:39:13.patch, KAFKA-1670_2014-10-07_13:49:10.patch The maximum value for the topic-level config {{segment.bytes}} is {{Int.MaxInt}} (2147483647). *Using this value causes brokers to corrupt their log files, leaving them unreadable.* We set {{segment.bytes}} to {{2122317824}} which is well below the maximum. One by one, the ISR of all partitions shrunk to 1. Brokers would crash when restarted, attempting to read from a negative offset in a log file. After discovering that many segment files had grown to 4GB or more, we were forced to shut down our *entire production Kafka cluster* for several hours while we split all segment files into 1GB chunks. Looking into the {{kafka.log}} code, the {{segment.bytes}} parameter is used inconsistently. It is treated as a *soft* maximum for the size of the segment file (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/LogConfig.scala#L26) with logs rolled only after (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/Log.scala#L246) they exceed this value. However, much of the code that deals with log files uses *ints* to store the size of the file and the position in the file. Overflow of these ints leads the broker to append to the segments indefinitely, and to fail to read these segments for consuming or recovery. This is trivial to reproduce: {code} $ bin/kafka-topics.sh --topic segment-bytes-test --create --replication-factor 2 --partitions 1 --zookeeper zkhost:2181 $ bin/kafka-topics.sh --topic segment-bytes-test --alter --config segment.bytes=2147483647 --zookeeper zkhost:2181 $ yes Int.MaxValue is a ridiculous bound on file size in 2014 | bin/kafka-console-producer.sh --broker-list localhost:6667 zkhost:2181 --topic segment-bytes-test {code} After running for a few minutes, the log file is corrupt: {code} $ ls -lh data/segment-bytes-test-0/ total 9.7G -rw-r--r-- 1 root root 10M Oct 3 19:39 .index -rw-r--r-- 1 root root 9.7G Oct 3 19:39 .log {code} We recovered the data from the log files using a simple Python script: https://gist.github.com/also/9f823d9eb9dc0a410796 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26291: Patch for KAFKA-1648
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26291/#review55702 --- core/src/main/scala/kafka/consumer/PartitionAssignor.scala https://reviews.apache.org/r/26291/#comment96078 You could just do if (ctx.consumersForTopic.size = 0) core/src/main/scala/kafka/consumer/PartitionAssignor.scala https://reviews.apache.org/r/26291/#comment96080 Did you mean to return this? core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/26291/#comment96089 I don't think this is required. (Or if it is can you explain?) - Joel Koshy On Oct. 5, 2014, 12:40 a.m., Mayuresh Gharat wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26291/ --- (Updated Oct. 5, 2014, 12:40 a.m.) Review request for kafka. Bugs: KAFKA-1648 https://issues.apache.org/jira/browse/KAFKA-1648 Repository: kafka Description --- Removed the unnecessary comment Diffs - core/src/main/scala/kafka/consumer/PartitionAssignor.scala 8ea7368dc394a497164ea093ff8e9f2e6a94b1de core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b Diff: https://reviews.apache.org/r/26291/diff/ Testing --- Thanks, Mayuresh Gharat
Re: Review Request 26346: Patch for KAFKA-1670
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/#review55720 --- core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala https://reviews.apache.org/r/26346/#comment96114 We need to add this exception to ErrorMapping and Errors. We also need to add this class to org.apache.kafka.common.errors in the client. core/src/test/scala/unit/kafka/log/LogTest.scala https://reviews.apache.org/r/26346/#comment96104 By increasing the segment size to 100, does the log still roll on every message as indicated by the comment? - Jun Rao On Oct. 7, 2014, 8:49 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/ --- (Updated Oct. 7, 2014, 8:49 p.m.) Review request for kafka. Bugs: KAFKA-1670 https://issues.apache.org/jira/browse/KAFKA-1670 Repository: kafka Description --- KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. Diffs - core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala PRE-CREATION core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/test/scala/unit/kafka/log/LogManagerTest.scala 59bd8a981b3fb8595dd6e790a30071092978a88d core/src/test/scala/unit/kafka/log/LogTest.scala 577d102fc2eb6bb1a72326141ecd431db6d66f04 core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9556ed92c61ffee5423be962bcdbe64c71e1f2fa Diff: https://reviews.apache.org/r/26346/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Review Request 26346: Patch for KAFKA-1670
On Oct. 7, 2014, 11:03 p.m., Jun Rao wrote: core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala, line 20 https://reviews.apache.org/r/26346/diff/5/?file=714846#file714846line20 We need to add this exception to ErrorMapping and Errors. We also need to add this class to org.apache.kafka.common.errors in the client. sorry I missed it On Oct. 7, 2014, 11:03 p.m., Jun Rao wrote: core/src/test/scala/unit/kafka/log/LogTest.scala, line 242 https://reviews.apache.org/r/26346/diff/5/?file=714849#file714849line242 By increasing the segment size to 100, does the log still roll on every message as indicated by the comment? yes it rolls on every messageset. I can add a assert to test that if it required. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/#review55720 --- On Oct. 7, 2014, 8:49 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/ --- (Updated Oct. 7, 2014, 8:49 p.m.) Review request for kafka. Bugs: KAFKA-1670 https://issues.apache.org/jira/browse/KAFKA-1670 Repository: kafka Description --- KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. Diffs - core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala PRE-CREATION core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/test/scala/unit/kafka/log/LogManagerTest.scala 59bd8a981b3fb8595dd6e790a30071092978a88d core/src/test/scala/unit/kafka/log/LogTest.scala 577d102fc2eb6bb1a72326141ecd431db6d66f04 core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9556ed92c61ffee5423be962bcdbe64c71e1f2fa Diff: https://reviews.apache.org/r/26346/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Created] (KAFKA-1682) Security for Kafka
Jay Kreps created KAFKA-1682: Summary: Security for Kafka Key: KAFKA-1682 URL: https://issues.apache.org/jira/browse/KAFKA-1682 Project: Kafka Issue Type: New Feature Affects Versions: 0.9.0 Reporter: Jay Kreps -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1682) Security for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-1682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1682: - Description: Parent ticket for security. Wiki and discussion is here: https://cwiki.apache.org/confluence/display/KAFKA/Security Security for Kafka -- Key: KAFKA-1682 URL: https://issues.apache.org/jira/browse/KAFKA-1682 Project: Kafka Issue Type: New Feature Affects Versions: 0.9.0 Reporter: Jay Kreps Parent ticket for security. Wiki and discussion is here: https://cwiki.apache.org/confluence/display/KAFKA/Security -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1683) Implement a session concept in the socket server
Jay Kreps created KAFKA-1683: Summary: Implement a session concept in the socket server Key: KAFKA-1683 URL: https://issues.apache.org/jira/browse/KAFKA-1683 Project: Kafka Issue Type: Sub-task Affects Versions: 0.9.0 Reporter: Jay Kreps To implement authentication we need a way to keep track of some things between requests. The initial use for this would be remembering the authenticated user/principle info, but likely more uses would come up (for example we will also need to remember whether and which encryption or integrity measures are in place on the socket so we can wrap and unwrap writes and reads). I was thinking we could just add a Session object that might have a user field. The session object would need to get added to RequestChannel.Request so it is passed down to the API layer with each request. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1684) Implement SSL authentication
Jay Kreps created KAFKA-1684: Summary: Implement SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Affects Versions: 0.9.0 Reporter: Jay Kreps Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1684: - Summary: Implement TLS/SSL authentication (was: Implement SSL authentication) Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Affects Versions: 0.9.0 Reporter: Jay Kreps Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1685) Implement TLS/SSL tests
Jay Kreps created KAFKA-1685: Summary: Implement TLS/SSL tests Key: KAFKA-1685 URL: https://issues.apache.org/jira/browse/KAFKA-1685 Project: Kafka Issue Type: Sub-task Affects Versions: 0.9.0 Reporter: Jay Kreps Fix For: 0.9.0 We need to write a suite of unit tests for TLS authentication. This should be doable with a junit integration test. We can use the simple authorization plugin with only a single user whitelisted. The test can start the server and then connects with and without TLS and validates that access is only possible when authenticated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/#review55730 --- Looks good. Just some minor comments below. clients/src/main/java/org/apache/kafka/common/protocol/Errors.java https://reviews.apache.org/r/25886/#comment96119 Perhaps the message can be Messages are rejected since there are not enough in-sync replicas than required.. clients/src/main/java/org/apache/kafka/common/protocol/Errors.java https://reviews.apache.org/r/25886/#comment96120 How about Messages are written to the log, but to fewer in-sync replicas than required.? core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/25886/#comment96123 The error code should be NotEnoughReplicasAfterAppendCode. core/src/main/scala/kafka/common/NotEnoughReplicasException.scala https://reviews.apache.org/r/25886/#comment96124 Probably add that messages are rejected. core/src/main/scala/kafka/log/LogConfig.scala https://reviews.apache.org/r/25886/#comment96125 -1 (or all) core/src/main/scala/kafka/server/KafkaConfig.scala https://reviews.apache.org/r/25886/#comment96128 -1 (all) - Jun Rao On Oct. 6, 2014, 8:28 p.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/ --- (Updated Oct. 6, 2014, 8:28 p.m.) Review request for kafka. Repository: kafka Description --- KAFKA-1555: provide strong consistency with reasonable availability Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java f9de4af clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java addc906 clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 core/src/main/scala/kafka/cluster/Partition.scala ff106b4 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala PRE-CREATION core/src/main/scala/kafka/common/NotEnoughReplicasException.scala PRE-CREATION core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 core/src/main/scala/kafka/producer/SyncProducerConfig.scala 69b2d0c core/src/main/scala/kafka/server/KafkaApis.scala c584b55 core/src/main/scala/kafka/server/KafkaConfig.scala 165c816 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 39f777b core/src/test/scala/unit/kafka/producer/ProducerTest.scala dd71d81 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c Diff: https://reviews.apache.org/r/25886/diff/ Testing --- With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, with 1,3 and 4 min.insync.replicas. * min.insync.replicas=1 behaved normally (all writes succeeded as long as a broker was up) * min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and one broker was down * min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1 See notes about retry behavior in the JIRA. Thanks, Gwen Shapira
[jira] [Created] (KAFKA-1686) Implement SASL/Kerberos
Jay Kreps created KAFKA-1686: Summary: Implement SASL/Kerberos Key: KAFKA-1686 URL: https://issues.apache.org/jira/browse/KAFKA-1686 Project: Kafka Issue Type: Sub-task Affects Versions: 0.9.0 Reporter: Jay Kreps Fix For: 0.9.0 Implement SASL/Kerberos authentication. To do this we will need to introduce a new SASLRequest and SASLResponse pair to the client protocol. This request and response will each have only a single byte[] field and will be used to handle the SASL challenge/response cycle. Doing this will initialize the SaslServer instance and associate it with the session in a manner similar to KAFKA-1684. When using integrity or encryption mechanisms with SASL we will need to wrap and unwrap bytes as in KAFKA-1684 so the same interface that covers the SSLEngine will need to also cover the SaslServer instance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1687) SASL tests
Jay Kreps created KAFKA-1687: Summary: SASL tests Key: KAFKA-1687 URL: https://issues.apache.org/jira/browse/KAFKA-1687 Project: Kafka Issue Type: Sub-task Reporter: Jay Kreps We need tests for our SASL/Kerberos setup. This is not that easy to do with Kerberos because of the dependency on the KDC. However possibly we can test with another SASL mechanism that doesn't have that dependency? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14162833#comment-14162833 ] Jun Rao commented on KAFKA-1555: The patch that Gwen provided (using a min.isr topic level config) looks good to me (other than a few minor comments). If anyone else is interested in reviewing, please take another look. If there is no objection, I will most likely commit the patch once the remaining minor comments are resolved. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Kafka AWS deployment + UI console
We are going to deploy Kafka in Production and also monitor it via console. (e.g. State of partitions in Broker- leader and slaves, state of consumers ) Is there out-of-the-box solution? What is the best and efficient way of deployment and monitoring Has someone tried this- looks promising http://www.michael-noll.com/blog/2014/03/17/wirbelsturm-one-click-deploy-storm-kafka-clusters-with-vagrant-puppet/ -- Kind Regards, Shafaq
[jira] [Created] (KAFKA-1688) Add authorization interface and naive implementation
Jay Kreps created KAFKA-1688: Summary: Add authorization interface and naive implementation Key: KAFKA-1688 URL: https://issues.apache.org/jira/browse/KAFKA-1688 Project: Kafka Issue Type: Sub-task Reporter: Jay Kreps Add a PermissionManager interface as described here: https://cwiki.apache.org/confluence/display/KAFKA/Security (possibly there is a better name?) Implement calls to the PermissionsManager in KafkaApis for the main requests (FetchRequest, ProduceRequest, etc). We will need to add a new error code and exception to the protocol to indicate permission denied. Add a server configuration to give the class you want to instantiate that implements that interface. That class can define its own configuration properties from the main config file. Provide a simple implementation of this interface which just takes a user and ip whitelist and permits those in either of the whitelists to do anything, and denies all others. Rather than writing an integration test for this class we can probably just use this class for the TLS and SASL authentication testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14162839#comment-14162839 ] Jay Kreps commented on KAFKA-1684: -- We may be able to get most of the implementation for this from KAFKA-1477? Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Affects Versions: 0.9.0 Reporter: Jay Kreps Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Security JIRAS
Hey guys, As promised, I added a tree of JIRAs for the stuff in the security wiki ( https://cwiki.apache.org/confluence/display/KAFKA/Security): https://issues.apache.org/jira/browse/KAFKA-1682 I tried to break it into reasonably standalone pieces. I think many of the tickets could actually be done in parallel. Since there were many people interested in this area this may help parallelize the work a bit. I added some strawman details on implementation to each ticket. We can discuss and refine further on the individual tickets. Please take a look and let me know if this breakdown seems reasonable. Cheers, -Jay
Re: Review Request 26346: Patch for KAFKA-1670
On Oct. 7, 2014, 11:03 p.m., Jun Rao wrote: core/src/test/scala/unit/kafka/log/LogTest.scala, line 242 https://reviews.apache.org/r/26346/diff/5/?file=714849#file714849line242 By increasing the segment size to 100, does the log still roll on every message as indicated by the comment? Sriharsha Chintalapani wrote: yes it rolls on every messageset. I can add a assert to test that if it required. Will it? In each append, we add 2 messages with a total of 10 bytes. If we add a 10 byte per message overhead, with compression, it seems both message sets can fit in the same log segment of 100 bytes? - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/#review55720 --- On Oct. 7, 2014, 8:49 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/ --- (Updated Oct. 7, 2014, 8:49 p.m.) Review request for kafka. Bugs: KAFKA-1670 https://issues.apache.org/jira/browse/KAFKA-1670 Repository: kafka Description --- KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. Diffs - core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala PRE-CREATION core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/test/scala/unit/kafka/log/LogManagerTest.scala 59bd8a981b3fb8595dd6e790a30071092978a88d core/src/test/scala/unit/kafka/log/LogTest.scala 577d102fc2eb6bb1a72326141ecd431db6d66f04 core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9556ed92c61ffee5423be962bcdbe64c71e1f2fa Diff: https://reviews.apache.org/r/26346/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Security JIRAS
I think we need to add: * Authentication of Kafka brokers with a secured ZooKeeper * Kafka should be able to generate delegation tokens for MapReduce / Spark / Yarn jobs. * Extend systest framework to allow testing secured kafka Gwen On Tue, Oct 7, 2014 at 5:15 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, As promised, I added a tree of JIRAs for the stuff in the security wiki ( https://cwiki.apache.org/confluence/display/KAFKA/Security): https://issues.apache.org/jira/browse/KAFKA-1682 I tried to break it into reasonably standalone pieces. I think many of the tickets could actually be done in parallel. Since there were many people interested in this area this may help parallelize the work a bit. I added some strawman details on implementation to each ticket. We can discuss and refine further on the individual tickets. Please take a look and let me know if this breakdown seems reasonable. Cheers, -Jay
Re: Kafka AWS deployment + UI console
I'm using Hue's ZooKeeper app: http://gethue.com/new-zookeeper-browser-app/ This UI looks very cute, but I didn't try it yet: https://github.com/claudemamo/kafka-web-console Gwen On Tue, Oct 7, 2014 at 5:08 PM, Shafaq s.abdullah...@gmail.com wrote: We are going to deploy Kafka in Production and also monitor it via console. (e.g. State of partitions in Broker- leader and slaves, state of consumers ) Is there out-of-the-box solution? What is the best and efficient way of deployment and monitoring Has someone tried this- looks promising http://www.michael-noll.com/blog/2014/03/17/wirbelsturm-one-click-deploy-storm-kafka-clusters-with-vagrant-puppet/ -- Kind Regards, Shafaq
Re: Kafka AWS deployment + UI console
We use http://quantifind.com/KafkaOffsetMonitor/... On Tue, Oct 7, 2014 at 8:49 PM, Gwen Shapira gshap...@cloudera.com wrote: I'm using Hue's ZooKeeper app: http://gethue.com/new-zookeeper-browser-app/ This UI looks very cute, but I didn't try it yet: https://github.com/claudemamo/kafka-web-console Gwen On Tue, Oct 7, 2014 at 5:08 PM, Shafaq s.abdullah...@gmail.com wrote: We are going to deploy Kafka in Production and also monitor it via console. (e.g. State of partitions in Broker- leader and slaves, state of consumers ) Is there out-of-the-box solution? What is the best and efficient way of deployment and monitoring Has someone tried this- looks promising http://www.michael-noll.com/blog/2014/03/17/wirbelsturm-one-click-deploy-storm-kafka-clusters-with-vagrant-puppet/ -- Kind Regards, Shafaq
Re: Review Request 26346: Patch for KAFKA-1670
On Oct. 7, 2014, 11:03 p.m., Jun Rao wrote: core/src/test/scala/unit/kafka/log/LogTest.scala, line 242 https://reviews.apache.org/r/26346/diff/5/?file=714849#file714849line242 By increasing the segment size to 100, does the log still roll on every message as indicated by the comment? Sriharsha Chintalapani wrote: yes it rolls on every messageset. I can add a assert to test that if it required. Jun Rao wrote: Will it? In each append, we add 2 messages with a total of 10 bytes. If we add a 10 byte per message overhead, with compression, it seems both message sets can fit in the same log segment of 100 bytes? I am using validMessages.sizeInBytes which is showing the size of ByteBufferedMessageSet new ByteBufferMessageSet(DefaultCompressionCodec, new Message(hello.getBytes), new Message(there.getBytes)) as 83 - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/#review55720 --- On Oct. 7, 2014, 8:49 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/ --- (Updated Oct. 7, 2014, 8:49 p.m.) Review request for kafka. Bugs: KAFKA-1670 https://issues.apache.org/jira/browse/KAFKA-1670 Repository: kafka Description --- KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. Diffs - core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala PRE-CREATION core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/test/scala/unit/kafka/log/LogManagerTest.scala 59bd8a981b3fb8595dd6e790a30071092978a88d core/src/test/scala/unit/kafka/log/LogTest.scala 577d102fc2eb6bb1a72326141ecd431db6d66f04 core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9556ed92c61ffee5423be962bcdbe64c71e1f2fa Diff: https://reviews.apache.org/r/26346/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Updated] (KAFKA-1644) Inherit FetchResponse from RequestOrResponse
[ https://issues.apache.org/jira/browse/KAFKA-1644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1644: --- Resolution: Fixed Fix Version/s: 0.8.3 Status: Resolved (was: Patch Available) That for the latest patch. +1 and committed to trunk. Inherit FetchResponse from RequestOrResponse Key: KAFKA-1644 URL: https://issues.apache.org/jira/browse/KAFKA-1644 Project: Kafka Issue Type: Improvement Reporter: Anton Karamanov Assignee: Anton Karamanov Fix For: 0.8.3 Attachments: 0001-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch, 0002-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch, 0003-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch Unlike all other Kafka API responses {{FetchResponse}} is not a subclass of RequestOrResponse, which requires handling it as a special case while processing responses. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26346: Patch for KAFKA-1670
On Oct. 7, 2014, 11:03 p.m., Jun Rao wrote: core/src/test/scala/unit/kafka/log/LogTest.scala, line 242 https://reviews.apache.org/r/26346/diff/5/?file=714849#file714849line242 By increasing the segment size to 100, does the log still roll on every message as indicated by the comment? Sriharsha Chintalapani wrote: yes it rolls on every messageset. I can add a assert to test that if it required. Jun Rao wrote: Will it? In each append, we add 2 messages with a total of 10 bytes. If we add a 10 byte per message overhead, with compression, it seems both message sets can fit in the same log segment of 100 bytes? Sriharsha Chintalapani wrote: I am using validMessages.sizeInBytes which is showing the size of ByteBufferedMessageSet new ByteBufferMessageSet(DefaultCompressionCodec, new Message(hello.getBytes), new Message(there.getBytes)) as 83 Great. Then, this is fine. - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/#review55720 --- On Oct. 7, 2014, 8:49 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/ --- (Updated Oct. 7, 2014, 8:49 p.m.) Review request for kafka. Bugs: KAFKA-1670 https://issues.apache.org/jira/browse/KAFKA-1670 Repository: kafka Description --- KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. Diffs - core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala PRE-CREATION core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/test/scala/unit/kafka/log/LogManagerTest.scala 59bd8a981b3fb8595dd6e790a30071092978a88d core/src/test/scala/unit/kafka/log/LogTest.scala 577d102fc2eb6bb1a72326141ecd431db6d66f04 core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9556ed92c61ffee5423be962bcdbe64c71e1f2fa Diff: https://reviews.apache.org/r/26346/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Updated] (KAFKA-1670) Corrupt log files for segment.bytes values close to Int.MaxInt
[ https://issues.apache.org/jira/browse/KAFKA-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani updated KAFKA-1670: -- Attachment: KAFKA-1670_2014-10-07_18:39:31.patch Corrupt log files for segment.bytes values close to Int.MaxInt -- Key: KAFKA-1670 URL: https://issues.apache.org/jira/browse/KAFKA-1670 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1670.patch, KAFKA-1670_2014-10-04_20:17:46.patch, KAFKA-1670_2014-10-06_09:48:25.patch, KAFKA-1670_2014-10-07_13:39:13.patch, KAFKA-1670_2014-10-07_13:49:10.patch, KAFKA-1670_2014-10-07_18:39:31.patch The maximum value for the topic-level config {{segment.bytes}} is {{Int.MaxInt}} (2147483647). *Using this value causes brokers to corrupt their log files, leaving them unreadable.* We set {{segment.bytes}} to {{2122317824}} which is well below the maximum. One by one, the ISR of all partitions shrunk to 1. Brokers would crash when restarted, attempting to read from a negative offset in a log file. After discovering that many segment files had grown to 4GB or more, we were forced to shut down our *entire production Kafka cluster* for several hours while we split all segment files into 1GB chunks. Looking into the {{kafka.log}} code, the {{segment.bytes}} parameter is used inconsistently. It is treated as a *soft* maximum for the size of the segment file (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/LogConfig.scala#L26) with logs rolled only after (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/Log.scala#L246) they exceed this value. However, much of the code that deals with log files uses *ints* to store the size of the file and the position in the file. Overflow of these ints leads the broker to append to the segments indefinitely, and to fail to read these segments for consuming or recovery. This is trivial to reproduce: {code} $ bin/kafka-topics.sh --topic segment-bytes-test --create --replication-factor 2 --partitions 1 --zookeeper zkhost:2181 $ bin/kafka-topics.sh --topic segment-bytes-test --alter --config segment.bytes=2147483647 --zookeeper zkhost:2181 $ yes Int.MaxValue is a ridiculous bound on file size in 2014 | bin/kafka-console-producer.sh --broker-list localhost:6667 zkhost:2181 --topic segment-bytes-test {code} After running for a few minutes, the log file is corrupt: {code} $ ls -lh data/segment-bytes-test-0/ total 9.7G -rw-r--r-- 1 root root 10M Oct 3 19:39 .index -rw-r--r-- 1 root root 9.7G Oct 3 19:39 .log {code} We recovered the data from the log files using a simple Python script: https://gist.github.com/also/9f823d9eb9dc0a410796 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26346: Patch for KAFKA-1670
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26346/ --- (Updated Oct. 8, 2014, 1:39 a.m.) Review request for kafka. Bugs: KAFKA-1670 https://issues.apache.org/jira/browse/KAFKA-1670 Repository: kafka Description (updated) --- KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. Diffs (updated) - clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f420ad63406ee2a2fde9435762ae027d85f3 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala PRE-CREATION core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/test/scala/unit/kafka/log/LogManagerTest.scala 59bd8a981b3fb8595dd6e790a30071092978a88d core/src/test/scala/unit/kafka/log/LogTest.scala 577d102fc2eb6bb1a72326141ecd431db6d66f04 core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9556ed92c61ffee5423be962bcdbe64c71e1f2fa Diff: https://reviews.apache.org/r/26346/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1670) Corrupt log files for segment.bytes values close to Int.MaxInt
[ https://issues.apache.org/jira/browse/KAFKA-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14162918#comment-14162918 ] Sriharsha Chintalapani commented on KAFKA-1670: --- Updated reviewboard https://reviews.apache.org/r/26346/diff/ against branch origin/trunk Corrupt log files for segment.bytes values close to Int.MaxInt -- Key: KAFKA-1670 URL: https://issues.apache.org/jira/browse/KAFKA-1670 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1670.patch, KAFKA-1670_2014-10-04_20:17:46.patch, KAFKA-1670_2014-10-06_09:48:25.patch, KAFKA-1670_2014-10-07_13:39:13.patch, KAFKA-1670_2014-10-07_13:49:10.patch, KAFKA-1670_2014-10-07_18:39:31.patch The maximum value for the topic-level config {{segment.bytes}} is {{Int.MaxInt}} (2147483647). *Using this value causes brokers to corrupt their log files, leaving them unreadable.* We set {{segment.bytes}} to {{2122317824}} which is well below the maximum. One by one, the ISR of all partitions shrunk to 1. Brokers would crash when restarted, attempting to read from a negative offset in a log file. After discovering that many segment files had grown to 4GB or more, we were forced to shut down our *entire production Kafka cluster* for several hours while we split all segment files into 1GB chunks. Looking into the {{kafka.log}} code, the {{segment.bytes}} parameter is used inconsistently. It is treated as a *soft* maximum for the size of the segment file (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/LogConfig.scala#L26) with logs rolled only after (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/Log.scala#L246) they exceed this value. However, much of the code that deals with log files uses *ints* to store the size of the file and the position in the file. Overflow of these ints leads the broker to append to the segments indefinitely, and to fail to read these segments for consuming or recovery. This is trivial to reproduce: {code} $ bin/kafka-topics.sh --topic segment-bytes-test --create --replication-factor 2 --partitions 1 --zookeeper zkhost:2181 $ bin/kafka-topics.sh --topic segment-bytes-test --alter --config segment.bytes=2147483647 --zookeeper zkhost:2181 $ yes Int.MaxValue is a ridiculous bound on file size in 2014 | bin/kafka-console-producer.sh --broker-list localhost:6667 zkhost:2181 --topic segment-bytes-test {code} After running for a few minutes, the log file is corrupt: {code} $ ls -lh data/segment-bytes-test-0/ total 9.7G -rw-r--r-- 1 root root 10M Oct 3 19:39 .index -rw-r--r-- 1 root root 9.7G Oct 3 19:39 .log {code} We recovered the data from the log files using a simple Python script: https://gist.github.com/also/9f823d9eb9dc0a410796 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/ --- (Updated Oct. 8, 2014, 1:46 a.m.) Review request for kafka. Changes --- Minor fixes based on Jun's feedback. Repository: kafka Description --- KAFKA-1555: provide strong consistency with reasonable availability Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java f9de4af clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java addc906 clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 core/src/main/scala/kafka/cluster/Partition.scala ff106b4 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala PRE-CREATION core/src/main/scala/kafka/common/NotEnoughReplicasException.scala PRE-CREATION core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 core/src/main/scala/kafka/producer/SyncProducerConfig.scala 69b2d0c core/src/main/scala/kafka/server/KafkaApis.scala c584b55 core/src/main/scala/kafka/server/KafkaConfig.scala 165c816 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 39f777b core/src/test/scala/unit/kafka/producer/ProducerTest.scala dd71d81 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c Diff: https://reviews.apache.org/r/25886/diff/ Testing --- With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, with 1,3 and 4 min.insync.replicas. * min.insync.replicas=1 behaved normally (all writes succeeded as long as a broker was up) * min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and one broker was down * min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1 See notes about retry behavior in the JIRA. Thanks, Gwen Shapira
Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability
On Oct. 8, 2014, midnight, Jun Rao wrote: core/src/main/scala/kafka/cluster/Partition.scala, line 287 https://reviews.apache.org/r/25886/diff/8/?file=714240#file714240line287 The error code should be NotEnoughReplicasAfterAppendCode. Good catch. - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/#review55730 --- On Oct. 8, 2014, 1:46 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/ --- (Updated Oct. 8, 2014, 1:46 a.m.) Review request for kafka. Repository: kafka Description --- KAFKA-1555: provide strong consistency with reasonable availability Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java f9de4af clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java addc906 clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 core/src/main/scala/kafka/cluster/Partition.scala ff106b4 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala PRE-CREATION core/src/main/scala/kafka/common/NotEnoughReplicasException.scala PRE-CREATION core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 core/src/main/scala/kafka/producer/SyncProducerConfig.scala 69b2d0c core/src/main/scala/kafka/server/KafkaApis.scala c584b55 core/src/main/scala/kafka/server/KafkaConfig.scala 165c816 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 39f777b core/src/test/scala/unit/kafka/producer/ProducerTest.scala dd71d81 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c Diff: https://reviews.apache.org/r/25886/diff/ Testing --- With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, with 1,3 and 4 min.insync.replicas. * min.insync.replicas=1 behaved normally (all writes succeeded as long as a broker was up) * min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and one broker was down * min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1 See notes about retry behavior in the JIRA. Thanks, Gwen Shapira
Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/#review55745 --- clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java https://reviews.apache.org/r/25886/#comment96135 Where is this used? core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/25886/#comment96138 may be worth clarifying the comment - i.e., in this particular scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk. core/src/main/scala/kafka/log/LogConfig.scala https://reviews.apache.org/r/25886/#comment96149 We can validate though at the time the topic is created or altered right? i.e., in the admin utils although that does move a portion of the log config validation outside. - Joel Koshy On Oct. 8, 2014, 1:46 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/ --- (Updated Oct. 8, 2014, 1:46 a.m.) Review request for kafka. Repository: kafka Description --- KAFKA-1555: provide strong consistency with reasonable availability Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java f9de4af clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java addc906 clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 core/src/main/scala/kafka/cluster/Partition.scala ff106b4 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala PRE-CREATION core/src/main/scala/kafka/common/NotEnoughReplicasException.scala PRE-CREATION core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 core/src/main/scala/kafka/producer/SyncProducerConfig.scala 69b2d0c core/src/main/scala/kafka/server/KafkaApis.scala c584b55 core/src/main/scala/kafka/server/KafkaConfig.scala 165c816 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 39f777b core/src/test/scala/unit/kafka/producer/ProducerTest.scala dd71d81 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c Diff: https://reviews.apache.org/r/25886/diff/ Testing --- With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, with 1,3 and 4 min.insync.replicas. * min.insync.replicas=1 behaved normally (all writes succeeded as long as a broker was up) * min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and one broker was down * min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1 See notes about retry behavior in the JIRA. Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14162942#comment-14162942 ] Joel Koshy commented on KAFKA-1555: --- +1 (took a quick look at it) We could improve the retry handling on the producer in a separate jira - i.e., avoid unnecessarily sending duplicates as described in the earlier comment. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14162987#comment-14162987 ] xueqiang wang commented on KAFKA-1646: -- Yes, if there are many deleting and creating, Windows can't do well in preallocation. We have run a cluster for more than a month, and find if consumer reads history logs(such as from the earliest offset), the performance will be down to only 40% compared to that a month ago. By using the fix, the performance can keep stable. Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)