Re: What to do when file.rename fails?
Having looked at the logs the user posted, I don't think this specific issue has to do with /tmp path. However, now that the /tmp path is being discussed, I think it's a good idea that we default the Kafka logs to a certain folder. As Jay notes, it makes it very easy to just download and start the servers without having to fiddle with the configs when you are just starting out. Having said that, when I started out with Kafka, I found /tmp to be a odd place to default the path to. I expected them to be defaulted to a folder within the Kafka install. Somewhere like KAFKA_INSTALL_FOLDER/data/kafka-logs/ folder. Is that something we should do? -Jaikiran On Monday 26 January 2015 12:23 AM, Jay Kreps wrote: Hmm, but I don't think tmp gets cleaned while the server is running... The reason for using tmp was because we don't know which directory they will use and we don't want them to have to edit configuration for the simple out of the box getting started tutorial. I actually do think that is important. Maybe an intermediate step we could do is just call out this setting in the quickstart so people know where data is going and know they need to configure it later... -Jay On Sun, Jan 25, 2015 at 9:32 AM, Joe Stein joe.st...@stealth.ly wrote: This feels like another type of symptom from people using /tmp/ for their logs. Perosnally, I would rather use /mnt/data or something and if that doesn't exist on their machine we can exception, or no default and force set it. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Jan 25, 2015 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote: I think you are right, good catch. It could be that this user deleted the files manually, but I wonder if there isn't some way that is a Kafka bug--e.g. if multiple types of retention policies kick in at the same time do we synchronize that properly? -Jay On Sat, Jan 24, 2015 at 9:26 PM, Jaikiran Pai jai.forums2...@gmail.com wrote: Hi Jay, I spent some more time over this today and went back to the original thread which brought up the issue with file leaks [1]. I think that output of lsof in that logs has a very important hint: /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/68818668.log (deleted) java 8446 root 725u REG 253,2 536910838 26087364 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/69457098.log (deleted) java 8446 root 726u REG 253,2 536917902 26087368 Notice the (deleted) text in that output. The last time I looked at that output, I thought it was the user who had added that deleted text to help us understand that problem. But today I read up on the output format of lsof and it turns out that it's lsof which itself adds that hint whenever a file has already been deleted possibly by a different process but some other process is still holding on to open resources of that (deleted) file [2]. So in the context of the issue that we are discussing and the way Kafka deals with async deletes (i.e. first by attempting a rename of the log/index files), I think this all makes sense now. So what I think is happening is, some (other?) process (not sure what/why) has already deleted the log file that Kafka is using for the LogSegment. The LogSegment however still has open FileChannel resource on that deleted file (and that's why the open file descriptor is held on and shows up in that output). Now Kafka, at some point in time, triggers an async delete of the LogSegment, which involves a file rename of that (already deleted) log file. The rename fails (because the original file path isn't there anymore). As a result, we end up throwing that failed to rename, KafkaStorageException and thus leave behind the open FileChannel to continue being open forever (till the Kafka program exits). So I think we should: 1) Find what/why deletes that underlying log file(s). I'll add a reply to that original mail discussion asking the user if he can provide more details. 2) Handle this case and close the FileChannel. The patch that's been uploaded to review board https://reviews.apache.org/r/29755/ does that. The immediate delete on failure to rename, involves (safely) closing the open FileChannel and (safely) deleting the (possibly non-existent) file. By the way, this entire thing can be easily reproduced by running the following program which first creates a file and open a filechannel to that file and then waits for the user to delete that file externally (I used the rm command) and then go and tries to rename that deleted file, which then fails. In between each of these steps, you can run the lsof command externally to see the open file resources (I used 'lsof | grep test.log'): public static void main(String[] args)
Re: What to do when file.rename fails?
We probably can default the log dir to a relative path, sth like ../kafka-logs. As for I/O errors on rename, I agree that we probably should just shut down the broker since it's not expected to happen. Thanks, Jun On Mon, Jan 26, 2015 at 5:54 AM, Jaikiran Pai jai.forums2...@gmail.com wrote: Having looked at the logs the user posted, I don't think this specific issue has to do with /tmp path. However, now that the /tmp path is being discussed, I think it's a good idea that we default the Kafka logs to a certain folder. As Jay notes, it makes it very easy to just download and start the servers without having to fiddle with the configs when you are just starting out. Having said that, when I started out with Kafka, I found /tmp to be a odd place to default the path to. I expected them to be defaulted to a folder within the Kafka install. Somewhere like KAFKA_INSTALL_FOLDER/data/kafka-logs/ folder. Is that something we should do? -Jaikiran On Monday 26 January 2015 12:23 AM, Jay Kreps wrote: Hmm, but I don't think tmp gets cleaned while the server is running... The reason for using tmp was because we don't know which directory they will use and we don't want them to have to edit configuration for the simple out of the box getting started tutorial. I actually do think that is important. Maybe an intermediate step we could do is just call out this setting in the quickstart so people know where data is going and know they need to configure it later... -Jay On Sun, Jan 25, 2015 at 9:32 AM, Joe Stein joe.st...@stealth.ly wrote: This feels like another type of symptom from people using /tmp/ for their logs. Perosnally, I would rather use /mnt/data or something and if that doesn't exist on their machine we can exception, or no default and force set it. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Jan 25, 2015 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote: I think you are right, good catch. It could be that this user deleted the files manually, but I wonder if there isn't some way that is a Kafka bug--e.g. if multiple types of retention policies kick in at the same time do we synchronize that properly? -Jay On Sat, Jan 24, 2015 at 9:26 PM, Jaikiran Pai jai.forums2...@gmail.com wrote: Hi Jay, I spent some more time over this today and went back to the original thread which brought up the issue with file leaks [1]. I think that output of lsof in that logs has a very important hint: /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/68818668.log (deleted) java 8446 root 725u REG 253,2 536910838 26087364 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/69457098.log (deleted) java 8446 root 726u REG 253,2 536917902 26087368 Notice the (deleted) text in that output. The last time I looked at that output, I thought it was the user who had added that deleted text to help us understand that problem. But today I read up on the output format of lsof and it turns out that it's lsof which itself adds that hint whenever a file has already been deleted possibly by a different process but some other process is still holding on to open resources of that (deleted) file [2]. So in the context of the issue that we are discussing and the way Kafka deals with async deletes (i.e. first by attempting a rename of the log/index files), I think this all makes sense now. So what I think is happening is, some (other?) process (not sure what/why) has already deleted the log file that Kafka is using for the LogSegment. The LogSegment however still has open FileChannel resource on that deleted file (and that's why the open file descriptor is held on and shows up in that output). Now Kafka, at some point in time, triggers an async delete of the LogSegment, which involves a file rename of that (already deleted) log file. The rename fails (because the original file path isn't there anymore). As a result, we end up throwing that failed to rename, KafkaStorageException and thus leave behind the open FileChannel to continue being open forever (till the Kafka program exits). So I think we should: 1) Find what/why deletes that underlying log file(s). I'll add a reply to that original mail discussion asking the user if he can provide more details. 2) Handle this case and close the FileChannel. The patch that's been uploaded to review board https://reviews.apache.org/r/29755/ does that. The immediate delete on failure to rename, involves (safely) closing the open FileChannel and (safely) deleting the (possibly non-existent) file. By the way, this entire thing can be easily reproduced by running the following program which first
[jira] [Commented] (KAFKA-1889) Refactor shell wrapper scripts
[ https://issues.apache.org/jira/browse/KAFKA-1889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14291967#comment-14291967 ] Francois Saint-Jacques commented on KAFKA-1889: --- Do you think it could make it to 0.8.2? Refactor shell wrapper scripts -- Key: KAFKA-1889 URL: https://issues.apache.org/jira/browse/KAFKA-1889 Project: Kafka Issue Type: Improvement Components: packaging Reporter: Francois Saint-Jacques Assignee: Francois Saint-Jacques Priority: Minor Attachments: refactor-scripts-v1.patch, refactor-scripts-v2.patch Shell scripts in bin/ need love. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1507) Using GetOffsetShell against non-existent topic creates the topic unintentionally
[ https://issues.apache.org/jira/browse/KAFKA-1507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292220#comment-14292220 ] Jason Rosenberg commented on KAFKA-1507: I think relegating topic creation to an admin client would be very limitiing. It's extremely useful to have a self-service system where new applications can just create a new topic on demand (with reasonable defaults), without the need for an admin to come in and prepare topics ahead of a code release (leave that to dba's managing transactional databases!). I do like the idea of an automatic create topic request from a producer, in response to a topic not found exception, rather than auto-creating topics from meta-data requests (which happens asynchronously and causes the initial meta data request to fail usually!). Consumers should never create a topic, I should think. Using GetOffsetShell against non-existent topic creates the topic unintentionally - Key: KAFKA-1507 URL: https://issues.apache.org/jira/browse/KAFKA-1507 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: centos Reporter: Luke Forehand Assignee: Sriharsha Chintalapani Priority: Minor Labels: newbie Attachments: KAFKA-1507.patch, KAFKA-1507.patch, KAFKA-1507_2014-07-22_10:27:45.patch, KAFKA-1507_2014-07-23_17:07:20.patch, KAFKA-1507_2014-08-12_18:09:06.patch, KAFKA-1507_2014-08-22_11:06:38.patch, KAFKA-1507_2014-08-22_11:08:51.patch A typo in using GetOffsetShell command can cause a topic to be created which cannot be deleted (because deletion is still in progress) ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka10:9092,kafka11:9092,kafka12:9092,kafka13:9092 --topic typo --time 1 ./kafka-topics.sh --zookeeper stormqa1/kafka-prod --describe --topic typo Topic:typo PartitionCount:8ReplicationFactor:1 Configs: Topic: typo Partition: 0Leader: 10 Replicas: 10 Isr: 10 ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1792) change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments
[ https://issues.apache.org/jira/browse/KAFKA-1792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292355#comment-14292355 ] Dmitry Pekar edited comment on KAFKA-1792 at 1/26/15 8:26 PM: -- [~nehanarkhede] thanks for your comments. Please see the answers below: 1. As I've understood from previous discussions we should preserve more or less backward compatibility with old --generate. I even think that renaming to --rebalance was a bad idea. generate name was consistent in semantics with --execute and --verify. The scenario was --generate/--execute/--verify. If we rename --generate to --rebalance the naming becomes inconsistent. Also if we will remove -broker-list/-topics option from --rebalance that would restrict usecases of the CLI, so IMHO removing those options is destructive. 2. Currently --decomission-broker doesn't require additional options except broker id. 3. I think, that -broker-list/-topics should still be specified to --rebalance/--generate as described in 1. Sure, we could provide a better description of reassignment configuration if required. Need to negotiate what exactly should be printed and is it possible to determine those information from broker registry, stored in ZK. 4. Yes. When working on this I was under assumption, that the changes to this CLI would be minor. If required, we still can design a completely new CLI/or heavily change existent with required set of commands/options, but in that case, IMHO, we should develop and negotiate a design doc first. [~charmalloc] I think that we need to decide about further steps of fixing/implementing CLI changes. Your expertise is required to make those decisions. was (Author: dmitry pekar): [~nehanarkhede] thanks for your comments. Please see the answers below: 1. As I've understood from previous discussions we should preserve more or less backward compatibility with old --generate. I even think that renaming to --rebalance was a bad idea. generate name was consistent in semantics with --execute and --verify. The scenario was --generate/--execute/--verify. If we rename --generate to --rebalance the naming becomes inconsistent. Also if we will remove -broker-list/-topics option from --rebalance that would restrict usecases of the CLI, so IMHO removing those options is destructive. 2. Currently --decomission-broker doesn't require additional options except broker id. 3. I think, that -broker-list/-topics should still be specified to --rebalance/--generate as described in 1. Sure, we could provide a better description of reassignment configuration if required. Need to negotiate what exactly should be printed and is it possible to determine those information from broker registry, stored in ZK. 4. Yes. When working on this I was under assumption, that the changes to this CLI would be minor. If required, we still can design a completely new CLI/or heavily change existent with required set of commands/options, but in that case, IMHO, we should develop and negotiate a design doc first. [~charmalloc] I think that we need to decide about further steps of fixing/implementing CLI changes. Your expertise is required to make those decisions. change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments - Key: KAFKA-1792 URL: https://issues.apache.org/jira/browse/KAFKA-1792 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 Attachments: KAFKA-1792.patch, KAFKA-1792_2014-12-03_19:24:56.patch, KAFKA-1792_2014-12-08_13:42:43.patch, KAFKA-1792_2014-12-19_16:48:12.patch, KAFKA-1792_2015-01-14_12:54:52.patch, generate_alg_tests.txt, rebalance_use_cases.txt Current implementation produces fair replica distribution between specified list of brokers. Unfortunately, it doesn't take into account current replica assignment. So if we have, for instance, 3 brokers id=[0..2] and are going to add fourth broker id=3, generate will create an assignment config which will redistribute replicas fairly across brokers [0..3] in the same way as those partitions were created from scratch. It will not take into consideration current replica assignment and accordingly will not try to minimize number of replica moves between brokers. As proposed by [~charmalloc] this should be improved. New output of improved --generate algorithm should suite following requirements: - fairness of replica distribution - every broker will have R or R+1 replicas assigned; - minimum of reassignments - number of replica moves
[jira] [Comment Edited] (KAFKA-1792) change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments
[ https://issues.apache.org/jira/browse/KAFKA-1792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292355#comment-14292355 ] Dmitry Pekar edited comment on KAFKA-1792 at 1/26/15 8:27 PM: -- [~nehanarkhede] thanks for your comments. Please see the answers below: 1. As I've understood from previous discussions we should preserve more or less backward compatibility with old generate. I even think that renaming to rebalance was a bad idea. generate name was consistent in semantics with execute and verify. The scenario was generate/execute/verify. If we rename generate to rebalance the naming becomes inconsistent. Also if we will remove broker-list/topics option from rebalance that would restrict usecases of the CLI, so IMHO removing those options is destructive. 2. Currently decomission-broker doesn't require additional options except broker id. 3. I think, that broker-list/topics should still be specified to rebalance/generate as described in 1. Sure, we could provide a better description of reassignment configuration if required. Need to negotiate what exactly should be printed and is it possible to determine those information from broker registry, stored in ZK. 4. Yes. When working on this I was under assumption, that the changes to this CLI would be minor. If required, we still can design a completely new CLI/or heavily change existent with required set of commands/options, but in that case, IMHO, we should develop and negotiate a design doc first. [~charmalloc] I think that we need to decide about further steps of fixing/implementing CLI changes. Your expertise is required to make those decisions. was (Author: dmitry pekar): [~nehanarkhede] thanks for your comments. Please see the answers below: 1. As I've understood from previous discussions we should preserve more or less backward compatibility with old --generate. I even think that renaming to --rebalance was a bad idea. generate name was consistent in semantics with --execute and --verify. The scenario was --generate/--execute/--verify. If we rename --generate to --rebalance the naming becomes inconsistent. Also if we will remove -broker-list/-topics option from --rebalance that would restrict usecases of the CLI, so IMHO removing those options is destructive. 2. Currently --decomission-broker doesn't require additional options except broker id. 3. I think, that -broker-list/-topics should still be specified to --rebalance/--generate as described in 1. Sure, we could provide a better description of reassignment configuration if required. Need to negotiate what exactly should be printed and is it possible to determine those information from broker registry, stored in ZK. 4. Yes. When working on this I was under assumption, that the changes to this CLI would be minor. If required, we still can design a completely new CLI/or heavily change existent with required set of commands/options, but in that case, IMHO, we should develop and negotiate a design doc first. [~charmalloc] I think that we need to decide about further steps of fixing/implementing CLI changes. Your expertise is required to make those decisions. change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments - Key: KAFKA-1792 URL: https://issues.apache.org/jira/browse/KAFKA-1792 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 Attachments: KAFKA-1792.patch, KAFKA-1792_2014-12-03_19:24:56.patch, KAFKA-1792_2014-12-08_13:42:43.patch, KAFKA-1792_2014-12-19_16:48:12.patch, KAFKA-1792_2015-01-14_12:54:52.patch, generate_alg_tests.txt, rebalance_use_cases.txt Current implementation produces fair replica distribution between specified list of brokers. Unfortunately, it doesn't take into account current replica assignment. So if we have, for instance, 3 brokers id=[0..2] and are going to add fourth broker id=3, generate will create an assignment config which will redistribute replicas fairly across brokers [0..3] in the same way as those partitions were created from scratch. It will not take into consideration current replica assignment and accordingly will not try to minimize number of replica moves between brokers. As proposed by [~charmalloc] this should be improved. New output of improved --generate algorithm should suite following requirements: - fairness of replica distribution - every broker will have R or R+1 replicas assigned; - minimum of reassignments - number of replica moves between brokers will be minimal;
[jira] [Commented] (KAFKA-1792) change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments
[ https://issues.apache.org/jira/browse/KAFKA-1792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292355#comment-14292355 ] Dmitry Pekar commented on KAFKA-1792: - [~nehanarkhede] thanks for your comments. Please see the answers below: 1. As I've understood from previous discussions we should preserve more or less backward compatibility with old --generate. I even think that renaming to --rebalance was a bad idea. generate name was consistent in semantics with --execute and --verify. The scenario was --generate/--execute/--verify. If we rename --generate to --rebalance the naming becomes inconsistent. Also if we will remove -broker-list/-topics option from --rebalance that would restrict usecases of the CLI, so IMHO removing those options is destructive. 2. Currently --decomission-broker doesn't require additional options except broker id. 3. I think, that -broker-list/-topics should still be specified to --rebalance/--generate as described in 1. Sure, we could provide a better description of reassignment configuration if required. Need to negotiate what exactly should be printed and is it possible to determine those information from broker registry, stored in ZK. 4. Yes. When working on this I was under assumption, that the changes to this CLI would be minor. If required, we still can design a completely new CLI/or heavily change existent with required set of commands/options, but in that case, IMHO, we should develop and negotiate a design doc first. [~charmalloc] I think that we need to decide about further steps of fixing/implementing CLI changes. Your expertise is required to make those decisions. change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments - Key: KAFKA-1792 URL: https://issues.apache.org/jira/browse/KAFKA-1792 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 Attachments: KAFKA-1792.patch, KAFKA-1792_2014-12-03_19:24:56.patch, KAFKA-1792_2014-12-08_13:42:43.patch, KAFKA-1792_2014-12-19_16:48:12.patch, KAFKA-1792_2015-01-14_12:54:52.patch, generate_alg_tests.txt, rebalance_use_cases.txt Current implementation produces fair replica distribution between specified list of brokers. Unfortunately, it doesn't take into account current replica assignment. So if we have, for instance, 3 brokers id=[0..2] and are going to add fourth broker id=3, generate will create an assignment config which will redistribute replicas fairly across brokers [0..3] in the same way as those partitions were created from scratch. It will not take into consideration current replica assignment and accordingly will not try to minimize number of replica moves between brokers. As proposed by [~charmalloc] this should be improved. New output of improved --generate algorithm should suite following requirements: - fairness of replica distribution - every broker will have R or R+1 replicas assigned; - minimum of reassignments - number of replica moves between brokers will be minimal; Example. Consider following replica distribution per brokers [0..3] (we just added brokers 2 and 3): - broker - 0, 1, 2, 3 - replicas - 7, 6, 0, 0 The new algorithm will produce following assignment: - broker - 0, 1, 2, 3 - replicas - 4, 3, 3, 3 - moves - -3, -3, +3, +3 It will be fair and number of moves will be 6, which is minimal for specified initial distribution. The scope of this issue is: - design an algorithm matching the above requirements; - implement this algorithm and unit tests; - test it manually using different initial assignments; -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[DISCUSSION] Boot dependency in the new producer
Hi all, I am not sure if we have discussed about this before, but recently I realized that we have introduced boot dependency of the kafka-server specified by the bootstrap.servers config in the new producer. More specifically, although in the old producer we also have a similar config for specifying the broker list, the producer will not try to connect to those brokers until the first message send call is triggered; whereas in the new producer, it will try to talk to them in construction time via: update(Cluster.bootstrap(addresses), time.milliseconds()); I personally am neutral to this change, as in most cases the corresponding kafka server should be up and running before the producer clients are deployed, but there are still some corner cases when it is not true, for example some standalone deployment tests of the app embedded with some clients, etc. So I would like to bring this up to people's attention if we have not discussed about it before: do we think this is OK to introduce this boot dependency in the new producer? -- Guozhang
Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)
Hi Kafka Team, I just wanted to bring this to your attention regarding Java New Producer limitation compare to old producer. a) Partition Increasing is limited to configured memory allocation. buffer.memory batch.size The maximum partition you could have before impacting (New Java Producers) producers is buffer.memory / batch.size. So Developer can plan for horizontal scaling partition from the beginning otherwise production running code will be impacted based on *block.on.buffer.full configuration *(block or BufferExhaustedException). This limitation does not exits with old scala based Producer. This will allow user community to buffer more and plan the capacity before hand. May be add this info http://kafka.apache.org/documentation.html#newproducerconfigs about limitation. Thanks, Bhavesh On Mon, Jan 26, 2015 at 10:28 AM, Joe Stein joe.st...@stealth.ly wrote: +1 (binding) artifacts and quick start look good. I ran in some client code, minor edits from 0-8.2-beta https://github.com/stealthly/scala-kafka/pull/26 On Mon, Jan 26, 2015 at 3:38 AM, Manikumar Reddy ku...@nmsworks.co.in wrote: +1 (Non-binding) Verified source package, unit tests, release build, topic deletion, compaction and random testing On Mon, Jan 26, 2015 at 6:14 AM, Neha Narkhede n...@confluent.io wrote: +1 (binding) Verified keys, quick start, unit tests. On Sat, Jan 24, 2015 at 4:26 PM, Joe Stein joe.st...@stealth.ly wrote: That makes sense, thanks! On Sat, Jan 24, 2015 at 7:00 PM, Jay Kreps jay.kr...@gmail.com wrote: But I think the flaw in trying to guess what kind of serializer they will use is when we get it wrong. Basically let's say we guess String. Say 30% of the time we will be right and we will save the two configuration lines. 70% of the time we will be wrong and the user gets a super cryptic ClassCastException: xyz cannot be cast to [B (because [B is how java chooses to display the byte array class just to up the pain), then they figure out how to subscribe to our mailing list and email us the cryptic exception, then we explain about how we helpfully set these properties for them to save them time. :-) https://www.google.com/?gws_rd=ssl#q=kafka+classcastexception+%22%5BB%22 I think basically we did this experiment with the old clients and the conclusion is that serialization is something you basically have to think about to use Kafka and trying to guess just makes things worse. -Jay On Sat, Jan 24, 2015 at 2:51 PM, Joe Stein joe.st...@stealth.ly wrote: Maybe. I think the StringSerialzer could look more like a typical type of message. Instead of encoding being a property it would be more typically just written in the bytes. On Sat, Jan 24, 2015 at 12:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I don't think so--see if you buy my explanation. We previously defaulted to the byte array serializer and it was a source of unending frustration and confusion. Since it wasn't a required config people just went along plugging in whatever objects they had, and thinking that changing the parametric types would somehow help. Then they would get a class case exception and assume our stuff was somehow busted, not realizing we had helpfully configured a type different from what they were passing in under the covers. So I think it is actually good for people to think: how am I serializing my data, and getting that exception will make them ask that question right? -Jay On Fri, Jan 23, 2015 at 9:06 PM, Joe Stein joe.st...@stealth.ly wrote: Should value.serializer in the new java producer be defaulted to Array[Byte] ? I was working on testing some upgrade paths and got this ! return exception in callback when buffer cannot accept message ConfigException: Missing required configuration value.serializer which has no default value. (ConfigDef.java:124) org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) org.apache.kafka.common.config.AbstractConfig.init(AbstractConfig.java:48) org.apache.kafka.clients.producer.ProducerConfig.init(ProducerConfig.java:235) org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:129) ly.stealth.testing.BaseSpec$class.createNewKafkaProducer(BaseSpec.scala:42) ly.stealth.testing.KafkaSpec.createNewKafkaProducer(KafkaSpec.scala:36) ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:175) ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:170)
Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)
+1 (binding) Verified quick start and unit tests. Thanks, Jun On Wed, Jan 21, 2015 at 8:28 AM, Jun Rao j...@confluent.io wrote: This is the second candidate for release of Apache Kafka 0.8.2.0. There has been some changes since the 0.8.2 beta release, especially in the new java producer api and jmx mbean names. It would be great if people can test this out thoroughly. Release Notes for the 0.8.2.0 release https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html *** Please download, test and vote by Monday, Jan 26h, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c (commit 0b312a6b9f0833d38eec434bfff4c647c1814564) /*** Thanks, Jun
[jira] [Commented] (KAFKA-1861) Publishing kafka-client:test in order to utilize the helper utils in TestUtils
[ https://issues.apache.org/jira/browse/KAFKA-1861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292169#comment-14292169 ] Navina Ramesh commented on KAFKA-1861: -- [~nehanarkhede] I don't see it maven central. Looks like it has not been published to the repo. [~omkreddy] Thanks for doing this! Publishing kafka-client:test in order to utilize the helper utils in TestUtils -- Key: KAFKA-1861 URL: https://issues.apache.org/jira/browse/KAFKA-1861 Project: Kafka Issue Type: Bug Reporter: Navina Ramesh Assignee: Manikumar Reddy Attachments: KAFKA-1861.patch Related to SAMZA-227 (Upgrade KafkaSystemProducer to new java-based Kafka API) Turns out that some of the utilities that are helpful in writing unit tests are available in org.apache.kafka.test.TestUtils.java (:kafka-clients). This is not published to maven repository. Hence, we are forced to reproduce the same code in samza. This can be avoided if the test package is published to the Maven repo. For example, we are creating a customize MockProducer to be used in Samza unit-tests and access to these quick helper utils will be useful. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: What to do when file.rename fails?
Agree with Sriram / Jun, I think the error should be treated as fatal and we should shutdown the broker gracefully. On Mon, Jan 26, 2015 at 8:41 AM, Jun Rao j...@confluent.io wrote: We probably can default the log dir to a relative path, sth like ../kafka-logs. As for I/O errors on rename, I agree that we probably should just shut down the broker since it's not expected to happen. Thanks, Jun On Mon, Jan 26, 2015 at 5:54 AM, Jaikiran Pai jai.forums2...@gmail.com wrote: Having looked at the logs the user posted, I don't think this specific issue has to do with /tmp path. However, now that the /tmp path is being discussed, I think it's a good idea that we default the Kafka logs to a certain folder. As Jay notes, it makes it very easy to just download and start the servers without having to fiddle with the configs when you are just starting out. Having said that, when I started out with Kafka, I found /tmp to be a odd place to default the path to. I expected them to be defaulted to a folder within the Kafka install. Somewhere like KAFKA_INSTALL_FOLDER/data/kafka-logs/ folder. Is that something we should do? -Jaikiran On Monday 26 January 2015 12:23 AM, Jay Kreps wrote: Hmm, but I don't think tmp gets cleaned while the server is running... The reason for using tmp was because we don't know which directory they will use and we don't want them to have to edit configuration for the simple out of the box getting started tutorial. I actually do think that is important. Maybe an intermediate step we could do is just call out this setting in the quickstart so people know where data is going and know they need to configure it later... -Jay On Sun, Jan 25, 2015 at 9:32 AM, Joe Stein joe.st...@stealth.ly wrote: This feels like another type of symptom from people using /tmp/ for their logs. Perosnally, I would rather use /mnt/data or something and if that doesn't exist on their machine we can exception, or no default and force set it. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Jan 25, 2015 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote: I think you are right, good catch. It could be that this user deleted the files manually, but I wonder if there isn't some way that is a Kafka bug--e.g. if multiple types of retention policies kick in at the same time do we synchronize that properly? -Jay On Sat, Jan 24, 2015 at 9:26 PM, Jaikiran Pai jai.forums2...@gmail.com wrote: Hi Jay, I spent some more time over this today and went back to the original thread which brought up the issue with file leaks [1]. I think that output of lsof in that logs has a very important hint: /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/68818668.log (deleted) java 8446 root 725u REG 253,2 536910838 26087364 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/69457098.log (deleted) java 8446 root 726u REG 253,2 536917902 26087368 Notice the (deleted) text in that output. The last time I looked at that output, I thought it was the user who had added that deleted text to help us understand that problem. But today I read up on the output format of lsof and it turns out that it's lsof which itself adds that hint whenever a file has already been deleted possibly by a different process but some other process is still holding on to open resources of that (deleted) file [2]. So in the context of the issue that we are discussing and the way Kafka deals with async deletes (i.e. first by attempting a rename of the log/index files), I think this all makes sense now. So what I think is happening is, some (other?) process (not sure what/why) has already deleted the log file that Kafka is using for the LogSegment. The LogSegment however still has open FileChannel resource on that deleted file (and that's why the open file descriptor is held on and shows up in that output). Now Kafka, at some point in time, triggers an async delete of the LogSegment, which involves a file rename of that (already deleted) log file. The rename fails (because the original file path isn't there anymore). As a result, we end up throwing that failed to rename, KafkaStorageException and thus leave behind the open FileChannel to continue being open forever (till the Kafka program exits). So I think we should: 1) Find what/why deletes that underlying log file(s). I'll add a reply to that original mail discussion asking the user if he can provide more details. 2) Handle this case and close the FileChannel. The
Re: Review Request 30196: Patch for KAFKA-1886
On Jan. 26, 2015, 1:28 a.m., Neha Narkhede wrote: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala, line 235 https://reviews.apache.org/r/30196/diff/1/?file=831148#file831148line235 what is the purpose of this sleep? I wanted to make sure the SimpleConsumer was making a request to the broker when I interrupted. I can reduce the sleep time to 100ms if that helps. - Aditya --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30196/#review69579 --- On Jan. 22, 2015, 10:35 p.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30196/ --- (Updated Jan. 22, 2015, 10:35 p.m.) Review request for kafka and Joel Koshy. Bugs: KAFKA-1886 https://issues.apache.org/jira/browse/KAFKA-1886 Repository: kafka Description --- Fixing KAFKA-1886. Forcing the SimpleConsumer to throw a ClosedByInterruptException if thrown and not retry Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk Diffs - core/src/main/scala/kafka/consumer/SimpleConsumer.scala cbef84ac76e62768981f74e71d451f2bda995275 core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala a5386a03b62956bc440b40783247c8cdf7432315 Diff: https://reviews.apache.org/r/30196/diff/ Testing --- Added an integration test to PrimitiveAPITest.scala. Thanks, Aditya Auradkar
Re: [KIP-DISCUSSION] Mirror Maker Enhancement
Hi Jay and Neha, Thanks a lot for the reply and explanation. I do agree it makes more sense to avoid duplicate effort and plan based on new consumer. I’ll modify the KIP. To Jay’s question on message ordering - The data channel selection makes sure that the messages from the same source partition will sent by the same producer. So the order of the messages is guaranteed with proper producer settings (MaxInFlightRequests=1,retries=Integer.MaxValue, etc.) For keyed messages, because they come from the same source partition and will end up in the same target partition, as long as they are sent by the same producer, the order is guaranteed. For non-keyed messages, the messages coming from the same source partition might go to different target partitions. The order is only guaranteed within each partition. Anyway, I’ll modify the KIP and data channel will be away. Thanks. Jiangjie (Becket) Qin On 1/25/15, 4:34 PM, Neha Narkhede n...@confluent.io wrote: I think there is some value in investigating if we can go back to the simple mirror maker design, as Jay points out. Here you have N threads, each has a consumer and a producer. The reason why we had to move away from that was a combination of the difference in throughput between the consumer and the old producer and the deficiency of the consumer rebalancing that limits the total number of mirror maker threads. So the only option available was to increase the throughput of the limited # of mirror maker threads that could be deployed. Now that queuing design may not make sense, if the new producer's throughput is almost similar to the consumer AND the fact that the new round-robin based consumer rebalancing can allow a very high number of mirror maker instances to exist. This is the end state that the mirror maker should be in once the new consumer is complete, so it wouldn't hurt to see if we can just move to that right now. On Fri, Jan 23, 2015 at 8:40 PM, Jay Kreps jay.kr...@gmail.com wrote: QQ: If we ever use a different technique for the data channel selection than for the producer partitioning won't that break ordering? How can we ensure these things stay in sync? With respect to the new consumer--I really do want to encourage people to think through how MM will work with the new consumer. I mean this isn't very far off, maybe a few months if we hustle? I could imagine us getting this mm fix done maybe sooner, maybe in a month? So I guess this buys us an extra month before we rip it out and throw it away? Maybe two? This bug has been there for a while, though, right? Is it worth it? Probably it is, but it still kind of sucks to have the duplicate effort. So anyhow let's definitely think about how things will work with the new consumer. I think we can probably just have N threads, each thread has a producer and consumer and is internally single threaded. Any reason this wouldn't work? -Jay On Wed, Jan 21, 2015 at 5:29 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, Thanks for comments. Please see inline responses. Jiangjie (Becket) Qin On 1/21/15, 1:33 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, A couple questions/comments: 1. The callback and user-controlled commit offset functionality is already in the new consumer which we are working on in parallel. If we accelerated that work it might help concentrate efforts. I admit this might take slightly longer in calendar time but could still probably get done this quarter. Have you guys considered that approach? Yes, I totally agree that ideally we should put efforts on new consumer. The main reason for still working on the old consumer is that we expect it would still be used in LinkedIn for quite a while before the new consumer could be fully rolled out. And we recently suffering a lot from mirror maker data loss issue. So our current plan is making necessary changes to make current mirror maker stable in production. Then we can test and rollout new consumer gradually without getting burnt. 2. I think partitioning on the hash of the topic partition is not a very good idea because that will make the case of going from a cluster with fewer partitions to one with more partitions not work. I think an intuitive way to do this would be the following: a. Default behavior: Just do what the producer does. I.e. if you specify a key use it for partitioning, if not just partition in a round-robin fashion. b. Add a --preserve-partition option that will explicitly inherent the partition from the source irrespective of whether there is a key or which partition that key would hash to. Sorry that I did not explain this clear enough. The hash of topic partition is only used when decide which mirror maker data channel queue the consumer thread should put message into. It only tries to make sure the messages from the same partition is sent by the same producer thread to guarantee the
Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)
+1 (binding) artifacts and quick start look good. I ran in some client code, minor edits from 0-8.2-beta https://github.com/stealthly/scala-kafka/pull/26 On Mon, Jan 26, 2015 at 3:38 AM, Manikumar Reddy ku...@nmsworks.co.in wrote: +1 (Non-binding) Verified source package, unit tests, release build, topic deletion, compaction and random testing On Mon, Jan 26, 2015 at 6:14 AM, Neha Narkhede n...@confluent.io wrote: +1 (binding) Verified keys, quick start, unit tests. On Sat, Jan 24, 2015 at 4:26 PM, Joe Stein joe.st...@stealth.ly wrote: That makes sense, thanks! On Sat, Jan 24, 2015 at 7:00 PM, Jay Kreps jay.kr...@gmail.com wrote: But I think the flaw in trying to guess what kind of serializer they will use is when we get it wrong. Basically let's say we guess String. Say 30% of the time we will be right and we will save the two configuration lines. 70% of the time we will be wrong and the user gets a super cryptic ClassCastException: xyz cannot be cast to [B (because [B is how java chooses to display the byte array class just to up the pain), then they figure out how to subscribe to our mailing list and email us the cryptic exception, then we explain about how we helpfully set these properties for them to save them time. :-) https://www.google.com/?gws_rd=ssl#q=kafka+classcastexception+%22%5BB%22 I think basically we did this experiment with the old clients and the conclusion is that serialization is something you basically have to think about to use Kafka and trying to guess just makes things worse. -Jay On Sat, Jan 24, 2015 at 2:51 PM, Joe Stein joe.st...@stealth.ly wrote: Maybe. I think the StringSerialzer could look more like a typical type of message. Instead of encoding being a property it would be more typically just written in the bytes. On Sat, Jan 24, 2015 at 12:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I don't think so--see if you buy my explanation. We previously defaulted to the byte array serializer and it was a source of unending frustration and confusion. Since it wasn't a required config people just went along plugging in whatever objects they had, and thinking that changing the parametric types would somehow help. Then they would get a class case exception and assume our stuff was somehow busted, not realizing we had helpfully configured a type different from what they were passing in under the covers. So I think it is actually good for people to think: how am I serializing my data, and getting that exception will make them ask that question right? -Jay On Fri, Jan 23, 2015 at 9:06 PM, Joe Stein joe.st...@stealth.ly wrote: Should value.serializer in the new java producer be defaulted to Array[Byte] ? I was working on testing some upgrade paths and got this ! return exception in callback when buffer cannot accept message ConfigException: Missing required configuration value.serializer which has no default value. (ConfigDef.java:124) org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) org.apache.kafka.common.config.AbstractConfig.init(AbstractConfig.java:48) org.apache.kafka.clients.producer.ProducerConfig.init(ProducerConfig.java:235) org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:129) ly.stealth.testing.BaseSpec$class.createNewKafkaProducer(BaseSpec.scala:42) ly.stealth.testing.KafkaSpec.createNewKafkaProducer(KafkaSpec.scala:36) ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:175) ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:170) On Fri, Jan 23, 2015 at 5:55 PM, Jun Rao j...@confluent.io wrote: This is a reminder that the deadline for the vote is this Monday, Jan 26, 7pm PT. Thanks, Jun On Wed, Jan 21, 2015 at 8:28 AM, Jun Rao j...@confluent.io wrote: This is the second candidate for release of Apache Kafka 0.8.2.0. There has been some changes since the 0.8.2 beta release, especially in the new java producer api and jmx mbean names. It would be great if people can test this out thoroughly. Release Notes for the 0.8.2.0 release https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html *** Please download, test and vote by Monday, Jan 26h, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary):
Re: What to do when file.rename fails?
Also, I think I agree that shutting down is the right behavior. I think the real thing to do though is probably to debug that user's case and figure out if it is something inside kafka that is leading to double delete or if the files are getting deleted by something else they don't know about. There may be a larger issue where we have bugs that can lead to double deleting a log segment which currently is innocuous but if we made it shut down the server would be quite severe. -Jay On Mon, Jan 26, 2015 at 1:06 PM, Jay Kreps jay.kr...@gmail.com wrote: Having a relative path and keeping data under /data in the kafka distro would make sense. This would require some reworking of the shell scripts, though, as I think right now you an actually run Kafka from any directory and the cwd of the process will be whatever directory you start from. If we have a relative path in the config then the working directory will HAVE to be the kafka directory. This works for the simple download case but may making some packaging stuff harder for other use cases. -Jay On Mon, Jan 26, 2015 at 5:54 AM, Jaikiran Pai jai.forums2...@gmail.com wrote: Having looked at the logs the user posted, I don't think this specific issue has to do with /tmp path. However, now that the /tmp path is being discussed, I think it's a good idea that we default the Kafka logs to a certain folder. As Jay notes, it makes it very easy to just download and start the servers without having to fiddle with the configs when you are just starting out. Having said that, when I started out with Kafka, I found /tmp to be a odd place to default the path to. I expected them to be defaulted to a folder within the Kafka install. Somewhere like KAFKA_INSTALL_FOLDER/data/kafka-logs/ folder. Is that something we should do? -Jaikiran On Monday 26 January 2015 12:23 AM, Jay Kreps wrote: Hmm, but I don't think tmp gets cleaned while the server is running... The reason for using tmp was because we don't know which directory they will use and we don't want them to have to edit configuration for the simple out of the box getting started tutorial. I actually do think that is important. Maybe an intermediate step we could do is just call out this setting in the quickstart so people know where data is going and know they need to configure it later... -Jay On Sun, Jan 25, 2015 at 9:32 AM, Joe Stein joe.st...@stealth.ly wrote: This feels like another type of symptom from people using /tmp/ for their logs. Perosnally, I would rather use /mnt/data or something and if that doesn't exist on their machine we can exception, or no default and force set it. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Jan 25, 2015 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote: I think you are right, good catch. It could be that this user deleted the files manually, but I wonder if there isn't some way that is a Kafka bug--e.g. if multiple types of retention policies kick in at the same time do we synchronize that properly? -Jay On Sat, Jan 24, 2015 at 9:26 PM, Jaikiran Pai jai.forums2...@gmail.com wrote: Hi Jay, I spent some more time over this today and went back to the original thread which brought up the issue with file leaks [1]. I think that output of lsof in that logs has a very important hint: /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/68818668.log (deleted) java 8446 root 725u REG 253,2 536910838 26087364 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/69457098.log (deleted) java 8446 root 726u REG 253,2 536917902 26087368 Notice the (deleted) text in that output. The last time I looked at that output, I thought it was the user who had added that deleted text to help us understand that problem. But today I read up on the output format of lsof and it turns out that it's lsof which itself adds that hint whenever a file has already been deleted possibly by a different process but some other process is still holding on to open resources of that (deleted) file [2]. So in the context of the issue that we are discussing and the way Kafka deals with async deletes (i.e. first by attempting a rename of the log/index files), I think this all makes sense now. So what I think is happening is, some (other?) process (not sure what/why) has already deleted the log file that Kafka is using for the LogSegment. The LogSegment however still has open FileChannel resource on that deleted file (and that's why the open file descriptor is held on and shows up in that output). Now Kafka, at some point in time, triggers an async delete of the LogSegment, which involves a file rename of that (already deleted) log
Re: What to do when file.rename fails?
Having a relative path and keeping data under /data in the kafka distro would make sense. This would require some reworking of the shell scripts, though, as I think right now you an actually run Kafka from any directory and the cwd of the process will be whatever directory you start from. If we have a relative path in the config then the working directory will HAVE to be the kafka directory. This works for the simple download case but may making some packaging stuff harder for other use cases. -Jay On Mon, Jan 26, 2015 at 5:54 AM, Jaikiran Pai jai.forums2...@gmail.com wrote: Having looked at the logs the user posted, I don't think this specific issue has to do with /tmp path. However, now that the /tmp path is being discussed, I think it's a good idea that we default the Kafka logs to a certain folder. As Jay notes, it makes it very easy to just download and start the servers without having to fiddle with the configs when you are just starting out. Having said that, when I started out with Kafka, I found /tmp to be a odd place to default the path to. I expected them to be defaulted to a folder within the Kafka install. Somewhere like KAFKA_INSTALL_FOLDER/data/kafka-logs/ folder. Is that something we should do? -Jaikiran On Monday 26 January 2015 12:23 AM, Jay Kreps wrote: Hmm, but I don't think tmp gets cleaned while the server is running... The reason for using tmp was because we don't know which directory they will use and we don't want them to have to edit configuration for the simple out of the box getting started tutorial. I actually do think that is important. Maybe an intermediate step we could do is just call out this setting in the quickstart so people know where data is going and know they need to configure it later... -Jay On Sun, Jan 25, 2015 at 9:32 AM, Joe Stein joe.st...@stealth.ly wrote: This feels like another type of symptom from people using /tmp/ for their logs. Perosnally, I would rather use /mnt/data or something and if that doesn't exist on their machine we can exception, or no default and force set it. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Jan 25, 2015 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote: I think you are right, good catch. It could be that this user deleted the files manually, but I wonder if there isn't some way that is a Kafka bug--e.g. if multiple types of retention policies kick in at the same time do we synchronize that properly? -Jay On Sat, Jan 24, 2015 at 9:26 PM, Jaikiran Pai jai.forums2...@gmail.com wrote: Hi Jay, I spent some more time over this today and went back to the original thread which brought up the issue with file leaks [1]. I think that output of lsof in that logs has a very important hint: /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/68818668.log (deleted) java 8446 root 725u REG 253,2 536910838 26087364 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/69457098.log (deleted) java 8446 root 726u REG 253,2 536917902 26087368 Notice the (deleted) text in that output. The last time I looked at that output, I thought it was the user who had added that deleted text to help us understand that problem. But today I read up on the output format of lsof and it turns out that it's lsof which itself adds that hint whenever a file has already been deleted possibly by a different process but some other process is still holding on to open resources of that (deleted) file [2]. So in the context of the issue that we are discussing and the way Kafka deals with async deletes (i.e. first by attempting a rename of the log/index files), I think this all makes sense now. So what I think is happening is, some (other?) process (not sure what/why) has already deleted the log file that Kafka is using for the LogSegment. The LogSegment however still has open FileChannel resource on that deleted file (and that's why the open file descriptor is held on and shows up in that output). Now Kafka, at some point in time, triggers an async delete of the LogSegment, which involves a file rename of that (already deleted) log file. The rename fails (because the original file path isn't there anymore). As a result, we end up throwing that failed to rename, KafkaStorageException and thus leave behind the open FileChannel to continue being open forever (till the Kafka program exits). So I think we should: 1) Find what/why deletes that underlying log file(s). I'll add a reply to that original mail discussion asking the user if he can provide more details. 2) Handle this case and close the FileChannel. The patch that's been uploaded to review board
Re: [DISCUSSION] Boot dependency in the new producer
Hey Guozhang, That line shouldn't cause any connections to Kafka to be established, does it? All that is doing is creating the Cluster pojo using the supplied addresses. The use of InetSocketAddress may cause some dns stuff to happen, though... -Jay On Mon, Jan 26, 2015 at 10:50 AM, Guozhang Wang wangg...@gmail.com wrote: Hi all, I am not sure if we have discussed about this before, but recently I realized that we have introduced boot dependency of the kafka-server specified by the bootstrap.servers config in the new producer. More specifically, although in the old producer we also have a similar config for specifying the broker list, the producer will not try to connect to those brokers until the first message send call is triggered; whereas in the new producer, it will try to talk to them in construction time via: update(Cluster.bootstrap(addresses), time.milliseconds()); I personally am neutral to this change, as in most cases the corresponding kafka server should be up and running before the producer clients are deployed, but there are still some corner cases when it is not true, for example some standalone deployment tests of the app embedded with some clients, etc. So I would like to bring this up to people's attention if we have not discussed about it before: do we think this is OK to introduce this boot dependency in the new producer? -- Guozhang
Re: What to do when file.rename fails?
I think that most packages already default log.dir to something more reasonable. On Mon, Jan 26, 2015 at 1:06 PM, Jay Kreps jay.kr...@gmail.com wrote: Having a relative path and keeping data under /data in the kafka distro would make sense. This would require some reworking of the shell scripts, though, as I think right now you an actually run Kafka from any directory and the cwd of the process will be whatever directory you start from. If we have a relative path in the config then the working directory will HAVE to be the kafka directory. This works for the simple download case but may making some packaging stuff harder for other use cases. -Jay On Mon, Jan 26, 2015 at 5:54 AM, Jaikiran Pai jai.forums2...@gmail.com wrote: Having looked at the logs the user posted, I don't think this specific issue has to do with /tmp path. However, now that the /tmp path is being discussed, I think it's a good idea that we default the Kafka logs to a certain folder. As Jay notes, it makes it very easy to just download and start the servers without having to fiddle with the configs when you are just starting out. Having said that, when I started out with Kafka, I found /tmp to be a odd place to default the path to. I expected them to be defaulted to a folder within the Kafka install. Somewhere like KAFKA_INSTALL_FOLDER/data/kafka-logs/ folder. Is that something we should do? -Jaikiran On Monday 26 January 2015 12:23 AM, Jay Kreps wrote: Hmm, but I don't think tmp gets cleaned while the server is running... The reason for using tmp was because we don't know which directory they will use and we don't want them to have to edit configuration for the simple out of the box getting started tutorial. I actually do think that is important. Maybe an intermediate step we could do is just call out this setting in the quickstart so people know where data is going and know they need to configure it later... -Jay On Sun, Jan 25, 2015 at 9:32 AM, Joe Stein joe.st...@stealth.ly wrote: This feels like another type of symptom from people using /tmp/ for their logs. Perosnally, I would rather use /mnt/data or something and if that doesn't exist on their machine we can exception, or no default and force set it. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Jan 25, 2015 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote: I think you are right, good catch. It could be that this user deleted the files manually, but I wonder if there isn't some way that is a Kafka bug--e.g. if multiple types of retention policies kick in at the same time do we synchronize that properly? -Jay On Sat, Jan 24, 2015 at 9:26 PM, Jaikiran Pai jai.forums2...@gmail.com wrote: Hi Jay, I spent some more time over this today and went back to the original thread which brought up the issue with file leaks [1]. I think that output of lsof in that logs has a very important hint: /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/68818668.log (deleted) java 8446 root 725u REG 253,2 536910838 26087364 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/69457098.log (deleted) java 8446 root 726u REG 253,2 536917902 26087368 Notice the (deleted) text in that output. The last time I looked at that output, I thought it was the user who had added that deleted text to help us understand that problem. But today I read up on the output format of lsof and it turns out that it's lsof which itself adds that hint whenever a file has already been deleted possibly by a different process but some other process is still holding on to open resources of that (deleted) file [2]. So in the context of the issue that we are discussing and the way Kafka deals with async deletes (i.e. first by attempting a rename of the log/index files), I think this all makes sense now. So what I think is happening is, some (other?) process (not sure what/why) has already deleted the log file that Kafka is using for the LogSegment. The LogSegment however still has open FileChannel resource on that deleted file (and that's why the open file descriptor is held on and shows up in that output). Now Kafka, at some point in time, triggers an async delete of the LogSegment, which involves a file rename of that (already deleted) log file. The rename fails (because the original file path isn't there anymore). As a result, we end up throwing that failed to rename, KafkaStorageException and thus leave behind the open FileChannel to continue being open forever (till the Kafka program exits). So I think we should: 1) Find what/why deletes that underlying log file(s). I'll add a reply to that original mail discussion asking
Re: [DISCUSS] KIPs
Sorry for late response, Magnus. See my comments inline: On Fri, Jan 23, 2015 at 7:31 AM, Magnus Edenhill mag...@edenhill.se wrote: Wouldn't it make sense to move away from these rich binary broker descriptors ({ host, port, proto }) (which require protocol churning on change), and simply use URIs instead? We use different representations in different places, so I'm not sure which one you mean... Our clients will use host:port and --security.protocol in the configuration to specify the protocol (this is to make it easier to ensure one protocol per client) ZK registration is in JSON Internal objects are binary, since parsing the URI over and over will be a PITA Wire protocol doesn't include a security protocol (since we don't negotiate it), so its still host+port, as it always was. E.g.: kafka://host[:port]/ -- cleantext proto on standard port 9092 kafkas://host[:port] -- SSL enveloped proto on standard port 9093 kafkas://user:pass@host[:port]/ -- SSL enveloped, with user authentication .. kafkafuturetech://.../#opts -- six months from now. Trailing #fragment_ids could be used to hint the client on protocol versions, supported authentications, etc. This also makes error reporting more meaningful on the client, e.g compare: Unsupported protocol 19 on broker foo:1234 to Unsupported protocol kafkafturetech on broker foo:1234 I agree that the second error is more readable, but I'm not sure why you think its currently unfeasible on clients? A positive side effect would be a more generalized topic addressing in clients: kafkacat kafka://bootstrap/mytopic/3?offset=end -- tail partition 3 of mytopic Clients can pretty much do what they want, right? As long as they call Kafka with the right configuration, its up to them to decide how to accept arguments. Just an idea, Magnus 2015-01-23 5:43 GMT+01:00 Jun Rao j...@confluent.io: Reviewed the latest patch in KAFKA-1809 :). Thanks, Jun On Thu, Jan 22, 2015 at 12:38 PM, Gwen Shapira gshap...@cloudera.com wrote: Thanks for validating our ideas. Updated the KIP with the workflow. Now if you can nudge Jun to review the latest patch... ;) On Thu, Jan 22, 2015 at 11:44 AM, Jay Kreps j...@confluent.io wrote: Oh yeah I think that is better, I hadn't thought of that approach! Any way you could describe the usage in the KIP, just for completeness? -Jay On Thu, Jan 22, 2015 at 10:23 AM, Gwen Shapira gshap...@cloudera.com wrote: I think what you described was the original design, so no wonder you are confused :) Following suggestions from Jun, I changed it a bit. The current model is: - Clients (producers and consumers) need to know about the broker ports in advance. They don't need to know about all brokers, but they need to know at least one host:port pair that speaks the protocol they want to use. The change is that all host:port pairs in broker.list must be of the same protocol and match the security.protocol configuration parameter. - Client uses security.protocol configuration parameter to open a connection to one of the brokers and sends the good old MetadataRequest. The broker knows which port it got the connection on, therefore it knows which security protocol is expected (it needs to use the same protocol to accept the connection and respond), and therefore it can send a response that contains only the host:port pairs that are relevant to that protocol. - From the client side the MetadataResponse did not change - it contains a list of brokerId,host,port that the client can connect to. The fact that all those broker endpoints were chosen out of a larger collection to match the right protocol is irrelevant for the client. I really like the new design since it preserves a lot of the same configurations and APIs. Thoughts? Gwen On Thu, Jan 22, 2015 at 9:19 AM, Jay Kreps jay.kr...@gmail.com wrote: I think I am still confused. In addition to the UpdateMetadataRequest don't we have to change the MetadataResponse so that it's possible for clients to discover the new ports? Or is that a second phase? I was imagining it worked by basically allowing the brokers to advertise multiple ports, one per security type, and then in the client you configure a protocol which will implicitly choose the port from the options returned in metadata to you... Likewise in the ConsumerMetadataResponse we are currently giving back full broker information. I think we would have two options here: either change the broker information included in that response to match the metadataresponse or else remove the broker information entirely and just return the node id (since in order to use that request you would already have to have the cluster metadata). The second option may be cleaner since it means we
Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)
Hi, Don't use Graphite, so I don't know. Kyle, maybe you can share more info? What do you mean by reported to Yammer for example? And when you say Yammer/Graphite, are you trying to say that you are using the Graphite Reporter? If so, can you try other Yammer Reporters and see if there is a more general issue or something limited to either Graphite or Graphite Reporter? I am pretty sure we are able to see all Kafka 0.8.2 metrics nicely in SPM (in non-public version of the Kafka monitoring agent). Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Jan 26, 2015 at 7:37 PM, Jun Rao j...@confluent.io wrote: Hmm, that's not the intention. The per-topic mbeans are definitely registered by Yammer. So, not sure why it's not reported to Graphite. Otis, Vladimir, Do you guys know? Thanks, Jun On Mon, Jan 26, 2015 at 4:08 PM, Kyle Banker kyleban...@gmail.com wrote: This is still preliminary, but it looks as if the change to metric names for per-topic metrics (bytes/messages in/out) is preventing these metrics from being reported to Yammer/Graphite. If this isn't intentional, it should probably be addressed before release. On Wed, Jan 21, 2015 at 9:28 AM, Jun Rao j...@confluent.io wrote: This is the second candidate for release of Apache Kafka 0.8.2.0. There has been some changes since the 0.8.2 beta release, especially in the new java producer api and jmx mbean names. It would be great if people can test this out thoroughly. Release Notes for the 0.8.2.0 release https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html *** Please download, test and vote by Monday, Jan 26h, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c (commit 0b312a6b9f0833d38eec434bfff4c647c1814564) /*** Thanks, Jun
Re: Kafka Out of Memory error
Thanks a lot Gwen. I bumped up the JVM to 1g on the consumer side and it works :) All the consumer belong to the same group and I am using the High level group API to consume from the kafka. It seems there is some initial meta data exchange or something about all the partitions are sent to all the consumer. Also, I launch 10 consumer from each machine at a time and keep on adding till 200 consumers. I see that initial consumers seem to require initially lot more cpu and memory. Should I launch all the consumers at one go instead of adding 10 at a time? On different issue, I couldn't find anyway of keeping the current read offset metadata while using the High level API( I am using the gem mentioned in earlier mail). Is there anyway to record the current read metadata periodically to monitor the progress of the consumer. Further, everytime a consumer dies and restart it seems to start reading from the beginning, is there anyway to read from last read offsets only? Thanks -Pranay On Mon, Jan 19, 2015 at 6:54 PM, Gwen Shapira gshap...@cloudera.com wrote: Two things: 1. The OOM happened on the consumer, right? So the memory that matters is the RAM on the consumer machine, not on the Kafka cluster nodes. 2. If the consumers belong to the same consumer group, each will consume a subset of the partitions and will only need to allocate memory for those partitions. So, assuming all your consumers belong to the same group: 2 consumers - each has 500 partitions - each uses 500MB. The total remains 1GB no matter how many consumers you have, as long as they are all in the same group. If the consumer belong to different groups (i.e. they read copies of the same messages from the same partitions), then yes, you are limited to 8 per server (probably less because there are other stuff on the server). Gwen On Mon, Jan 19, 2015 at 3:06 PM, Pranay Agarwal agarwalpran...@gmail.com wrote: Thanks a lot Natty. I am using this Ruby gem on the client side with all the default config https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb and the value fetch.message.max.bytes is set to 1MB. Currently I only have 3 nodes setup in the Kafka cluster (with 8 GB RAM) and if 1 consumer if going to take 1000 partitions X 1mb ~ 1GB, does it mean 1 kafka node can at best support 8 consumer only? Also, when I do top/free on the Kafka cluster nodes (Both zookeeper and kafka is deployed on each 3 nodes of the cluster) I don't see lots of memory being used on the machine. Also, even with this calculation, I shouldn't be facing any issue with only 1 consumer, as I have 8GB of JVM space given to Kafka nodes, right? Thanks -Pranay On Mon, Jan 19, 2015 at 2:53 PM, Jonathan Natkins na...@streamsets.com wrote: The fetch.message.max.size is actually a client-side configuration. With regard to increasing the number of threads, I think the calculation may be a little more subtle than what you're proposing, and frankly, it's unlikely that your servers can handle allocating 200MB x 1000 threads = 200GB of memory at a single time. I believe that if you have every partition on a single broker, and all of your consumer threads are requesting data simultaneously, then yes, the broker would attempt to allocate 200GB of heap, and probably you'll hit an OOME. However, since each consumer is only reading from one partition, those 1000 threads should be making requests that are spread out over the entire Kafka cluster. Depending on the memory on your servers, you may need to increase the number of brokers in your cluster to support the 1000 threads. For example, I would expect that you can support this with 10 brokers if each broker has something north of 20GB of heap allocated. Some of this is a little bit of guess work on my part, and I'm not super confident of my numbers...Can anybody else on the list validate my math? Thanks, Natty Jonathan Natty Natkins StreamSets | Customer Engagement Engineer mobile: 609.577.1600 | linkedin http://www.linkedin.com/in/nattyice On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal agarwalpran...@gmail.com wrote: Thanks Natty. Is there any config which I need to change on the client side as well? Also, currently I am trying with only 1 consumer thread. Does the equation changes to (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with 1000 threads from from topic2(1000 partitions)? -Pranay On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins na...@streamsets.com wrote: Hi Pranay, I think the JIRA you're referencing is a bit orthogonal to the OOME that you're experiencing. Based on the stacktrace, it looks like your OOME is coming from a consumer request, which is attempting to allocate 200MB. There was a thread (relatively recently) that discussed what I think is your issue:
[DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints
Hi Kafka Devs, While reviewing the patch for KAFKA-1809, we came across two questions that we are interested in hearing the community out on. 1. This patch changes the Broker class and adds a new class BrokerEndPoint that behaves like the previous broker. While technically kafka.cluster.Broker is not part of the public API, it is returned by javaapi, used with the SimpleConsumer. Getting replicas from PartitionMetadata will now return BrokerEndPoint instead of Broker. All method calls remain the same, but since we return a new type, we break the API. Note that this breakage does not prevent upgrades - existing SimpleConsumers will continue working (because we are wire-compatible). The only thing that won't work is building SimpleConsumers with dependency on Kafka versions higher than 0.8.2. Arguably, we don't want anyone to do it anyway :) So: Do we state that the highest release on which SimpleConsumers can depend is 0.8.2? Or shall we keep Broker as is and create an UberBroker which will contain multiple brokers as its endpoints? 2. The KIP suggests use.new.wire.protocol configuration to decide which protocols the brokers will use to talk to each other. The problem is that after the next upgrade, the wire protocol is no longer new, so we'll have to reset it to false for the following upgrade, then change to true again... and upgrading more than a single version will be impossible. Bad idea :) As an alternative, we can have a property for each version and set one of them to true. Or (simple, I think) have wire.protocol.version property and accept version numbers (0.8.2, 0.8.3, 0.9) as values. Please share your thoughts :) Gwen
Re: Review Request 30259: Add static code coverage reporting capability
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30259/#review69727 --- This is a nice improvement to the project. Thanks! core/src/main/scala/kafka/utils/ZkUtils.scala https://reviews.apache.org/r/30259/#comment114480 Are there any open issues against scoverage that would explain why it can't instrument this class? If not, it might be worth contacting that project to see if they have any ideas why it blows up on this class. Probably would be good to add a TODO explaining that once scoverage can process this class the $COVERAGE-OFF$ should be removed. - Eric Olander On Jan. 25, 2015, 8:47 p.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30259/ --- (Updated Jan. 25, 2015, 8:47 p.m.) Review request for kafka. Bugs: KAFKA-1722 https://issues.apache.org/jira/browse/KAFKA-1722 Repository: kafka Description --- KAFKA-1722: Add static code coverage capability Diffs - build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 core/src/main/scala/kafka/utils/ZkUtils.scala c14bd455b6642f5e6eb254670bef9f57ae41d6cb Diff: https://reviews.apache.org/r/30259/diff/ Testing --- How to run: ./gradlew sonarRunner -PscalaVersion=2.11 Note that if you do not have sonarqube running on your system. The sonarRunner task will fail, but it would have generated coverage reports for core and clients at core/build/reports/scoverage/ and clients/build/reports/jacocoHtml respectively. Open index.html in any of those dirs to see the coverage. Once gradle-scoverage starts publishing scoverage report, a single report generated from sonar will be available. Thanks, Ashish Singh
Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)
This is still preliminary, but it looks as if the change to metric names for per-topic metrics (bytes/messages in/out) is preventing these metrics from being reported to Yammer/Graphite. If this isn't intentional, it should probably be addressed before release. On Wed, Jan 21, 2015 at 9:28 AM, Jun Rao j...@confluent.io wrote: This is the second candidate for release of Apache Kafka 0.8.2.0. There has been some changes since the 0.8.2 beta release, especially in the new java producer api and jmx mbean names. It would be great if people can test this out thoroughly. Release Notes for the 0.8.2.0 release https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html *** Please download, test and vote by Monday, Jan 26h, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c (commit 0b312a6b9f0833d38eec434bfff4c647c1814564) /*** Thanks, Jun
Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)
Hmm, that's not the intention. The per-topic mbeans are definitely registered by Yammer. So, not sure why it's not reported to Graphite. Otis, Vladimir, Do you guys know? Thanks, Jun On Mon, Jan 26, 2015 at 4:08 PM, Kyle Banker kyleban...@gmail.com wrote: This is still preliminary, but it looks as if the change to metric names for per-topic metrics (bytes/messages in/out) is preventing these metrics from being reported to Yammer/Graphite. If this isn't intentional, it should probably be addressed before release. On Wed, Jan 21, 2015 at 9:28 AM, Jun Rao j...@confluent.io wrote: This is the second candidate for release of Apache Kafka 0.8.2.0. There has been some changes since the 0.8.2 beta release, especially in the new java producer api and jmx mbean names. It would be great if people can test this out thoroughly. Release Notes for the 0.8.2.0 release https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html *** Please download, test and vote by Monday, Jan 26h, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c (commit 0b312a6b9f0833d38eec434bfff4c647c1814564) /*** Thanks, Jun
[jira] [Commented] (KAFKA-1897) Enhance MockProducer for more sophisticated tests
[ https://issues.apache.org/jira/browse/KAFKA-1897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292748#comment-14292748 ] Navina Ramesh commented on KAFKA-1897: -- 1. Yes. We can use the Cluster constructor. However, it is, as you mentioned, inconvenient. A client may not always want an empty cluster. The client should be able to create a topic with a fixed number of partitions, without worrying about the details of how PartitionInfo or TopicPartition is structured within Kafka. *Solution*: Instead of creating multiple constructor interfaces for the MockProducer, you can expose helper methods for creating a cluster (similar to the ones in org/apache/samza/utils/TestUtils.java). This way the client can quickly create a cluster and customize it per the test requirements. A workaround for this issue has been addressed here - https://issues.apache.org/jira/browse/KAFKA-1861 2. I am unable to conclude whether this is a gap in functionality or documentation. In Samza use-case, we use a generic producer interface that wraps the Kafka Producer. Due to this, we don't have access to the callback/future handles. Also, we had blocking calls that depend on producer thread to change the state and unblock *Test Case*: {code:borderStyle=solid} MockProducer mock = new MockProducer(false) KafkaSystemProducer p = new KafkaSystemProducer(producer=mock, ...) p.send(test, msg1) p.send(test, msg2) p.flush() // -- Blocking on actual send! mock.completeNext() // -- Doesn't reach this point because this thread is blocked flush {code} Without the MockProducer running on a parallel thread it is difficult to make assertions. Assertions / intercepts have to be made outside the main test thread. *Current Approach*: {code:borderStyle=solid} CustomMockProducer mock = new CustomMockProducer(false) KafkaSystemProducer p = new KafkaSystemProducer(producer=mock, ...) p.send(test, msg1) p.send(test, msg2) mock.startSendThread(100) // Starts a thread that invokes the buffered callbacks (mock.callback()) p.flush() // -- Blocking call; Eventually unblocks when the mock producer thread completes send; If it doesn't unblock, then the test has failed. If the send throws exception, then we can intercept it here in the main thread itself. assert(...) // I can add tests to check the state of the kafka system producer {code} *Alternate approach* - Start the blocking call in a separate thread and use the existing MockProducer {code:borderStyle=solid} MockProducer mock = new MockProducer(false) KafkaSystemProducer p = new KafkaSystemProducer(producer=mock, ...) p.send(test, msg1) p.send(test, msg2) Thread t = new Thread(new Runnable() { p.flush() //Blocks in a separate thread // -- Asserts / intercepts go here ?? errorNext() does not bubble-up exceptions }) t.start() mock.completeNext() mock.completeNext() {code} Drawback: we have to assert or intercept the exception thrown from outside the main thread. *Suggestions for the MockProducer*: 1. Provide an option to operate producer in _concurrent mode_ - This means a send will not invoke the callback immediately. With every send call, a future task gets created with callback and appended to the list of futures to be executed. Completion can be modified to make this work. This way the test thread and producer thread can be de-coupled. 2. Alternatively, the deque can be made accessible so that the client can choose to invoke the completion of send concurrently. Enhance MockProducer for more sophisticated tests - Key: KAFKA-1897 URL: https://issues.apache.org/jira/browse/KAFKA-1897 Project: Kafka Issue Type: Bug Components: producer Reporter: Navina Ramesh Assignee: Jun Rao Fix For: 0.8.2 Based on the experience of upgrading the kafka producer in Samza, we faced two main constraints when using MockProducer: 1. The constructor requires a cluster specification and the tools to create a test cluster is not exposed. It is available from TestUtils in Kafka, however that jar is not published. This issue is currently being addressed in KAFKA-1861. 2. No support for testing a blocking client call. For example, flush in Samza blocks on the future returned by the latest send request. In order to test this, the MockProducer which buffers it should run in a concurrent mode. There is currently no provision to do this. We want the MockProducer to buffer the send and then, complete the callback concurrently while we wait for flush to unblock. We can write unit tests that have improved coverage if we can add support for concurrent execution of the MockProducer and unit test thread.
Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)
Kyle, Also, which reporter are you using for Graphite? Thanks, Jun On Mon, Jan 26, 2015 at 4:08 PM, Kyle Banker kyleban...@gmail.com wrote: This is still preliminary, but it looks as if the change to metric names for per-topic metrics (bytes/messages in/out) is preventing these metrics from being reported to Yammer/Graphite. If this isn't intentional, it should probably be addressed before release. On Wed, Jan 21, 2015 at 9:28 AM, Jun Rao j...@confluent.io wrote: This is the second candidate for release of Apache Kafka 0.8.2.0. There has been some changes since the 0.8.2 beta release, especially in the new java producer api and jmx mbean names. It would be great if people can test this out thoroughly. Release Notes for the 0.8.2.0 release https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html *** Please download, test and vote by Monday, Jan 26h, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c (commit 0b312a6b9f0833d38eec434bfff4c647c1814564) /*** Thanks, Jun
Re: What to do when file.rename fails?
Our logs are called longnumber.log. There are sysadmins that automatically clean old log files with a small cron job that does: find / -name *.log -ctime +30 -exec rm{} I suspect that allowing Kafka admins to choose a different suffix may also help with cases where files suddenly disappear. (i.e we regularly renamed Oracle's redo log from the default of .log to .redo), I'd probably want to use .part for Kafka. Gwen On Mon, Jan 26, 2015 at 1:09 PM, Jay Kreps jay.kr...@gmail.com wrote: Also, I think I agree that shutting down is the right behavior. I think the real thing to do though is probably to debug that user's case and figure out if it is something inside kafka that is leading to double delete or if the files are getting deleted by something else they don't know about. There may be a larger issue where we have bugs that can lead to double deleting a log segment which currently is innocuous but if we made it shut down the server would be quite severe. -Jay On Mon, Jan 26, 2015 at 1:06 PM, Jay Kreps jay.kr...@gmail.com wrote: Having a relative path and keeping data under /data in the kafka distro would make sense. This would require some reworking of the shell scripts, though, as I think right now you an actually run Kafka from any directory and the cwd of the process will be whatever directory you start from. If we have a relative path in the config then the working directory will HAVE to be the kafka directory. This works for the simple download case but may making some packaging stuff harder for other use cases. -Jay On Mon, Jan 26, 2015 at 5:54 AM, Jaikiran Pai jai.forums2...@gmail.com wrote: Having looked at the logs the user posted, I don't think this specific issue has to do with /tmp path. However, now that the /tmp path is being discussed, I think it's a good idea that we default the Kafka logs to a certain folder. As Jay notes, it makes it very easy to just download and start the servers without having to fiddle with the configs when you are just starting out. Having said that, when I started out with Kafka, I found /tmp to be a odd place to default the path to. I expected them to be defaulted to a folder within the Kafka install. Somewhere like KAFKA_INSTALL_FOLDER/data/kafka-logs/ folder. Is that something we should do? -Jaikiran On Monday 26 January 2015 12:23 AM, Jay Kreps wrote: Hmm, but I don't think tmp gets cleaned while the server is running... The reason for using tmp was because we don't know which directory they will use and we don't want them to have to edit configuration for the simple out of the box getting started tutorial. I actually do think that is important. Maybe an intermediate step we could do is just call out this setting in the quickstart so people know where data is going and know they need to configure it later... -Jay On Sun, Jan 25, 2015 at 9:32 AM, Joe Stein joe.st...@stealth.ly wrote: This feels like another type of symptom from people using /tmp/ for their logs. Perosnally, I would rather use /mnt/data or something and if that doesn't exist on their machine we can exception, or no default and force set it. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Jan 25, 2015 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote: I think you are right, good catch. It could be that this user deleted the files manually, but I wonder if there isn't some way that is a Kafka bug--e.g. if multiple types of retention policies kick in at the same time do we synchronize that properly? -Jay On Sat, Jan 24, 2015 at 9:26 PM, Jaikiran Pai jai.forums2...@gmail.com wrote: Hi Jay, I spent some more time over this today and went back to the original thread which brought up the issue with file leaks [1]. I think that output of lsof in that logs has a very important hint: /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/68818668.log (deleted) java 8446 root 725u REG 253,2 536910838 26087364 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_ topic_ypgsearch_yellowpageV2-0/69457098.log (deleted) java 8446 root 726u REG 253,2 536917902 26087368 Notice the (deleted) text in that output. The last time I looked at that output, I thought it was the user who had added that deleted text to help us understand that problem. But today I read up on the output format of lsof and it turns out that it's lsof which itself adds that hint whenever a file has already been deleted possibly by a different process but some other process is still holding on to open resources of that (deleted) file [2]. So in the context of the issue that we are discussing and the way Kafka deals with async deletes (i.e. first by attempting a rename of the log/index files),
Re: Review Request 28769: Patch for KAFKA-1809
On Jan. 23, 2015, 1:57 a.m., Jun Rao wrote: Thanks for the patch. Looks promising. Some comments. 1. I overlooked this when I suggested the new broker format in ZK. This means that we will need to upgrade all consumer clients before we can turn on the flag of using the new protocol on the brokers, which may not be convenient. Now, I think your earlier approach is probably better because of this? Yeah, this will break ZK consumer :( I'm having second thoughts about the use.new.wire.protocol flag. After finishing the upgrade, it will have to be true on all brokers. Then during the next upgrade you'll need to set it back to false, and then back to true again... Perhaps we need something like: wire.protocol.version and accept values like 0.8.2, 0.8.3, 0.9, etc? This way you won't have to change it twice on each upgrade. Thoughts? On Jan. 23, 2015, 1:57 a.m., Jun Rao wrote: core/src/main/scala/kafka/javaapi/TopicMetadata.scala, lines 55-65 https://reviews.apache.org/r/28769/diff/12/?file=820424#file820424line55 Technically, this is an api change since it's used in javaapi.SimpleConsumer. The caller will now get a different type in the response. An alternative is to leave Broker as it is and create sth like BrokerProfile to include all endpoints. Perhaps, we need to discuss this in WIP a bit, whether it's better to break the api in order to use a more meaningingful class name, or not break the api and stick with a lousy name. Yeah, I think I mentioned that at one of the discussions. We decided we don't want to support the new security protocols on the old clients (which will be deprecated by the time this is included in a release), we definitely don't want to demand upgrade of clients during broker upgrade - but this API breakage won't do it. It just means that if you build a Simple Consumer, the highest version you can depend on is 0.8.2. Simple Consumers built on old versions will keep working (since we kept wire protocol compatible), and they will simple serialize the TopicMetadataResponse into Broker. Upgrades will work as long as no one will change dependencies and rebuild clients, which sounds fairly reasonable to me. I'll bring it up on the mailing list. On Jan. 23, 2015, 1:57 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/KafkaConfig.scala, lines 185-186 https://reviews.apache.org/r/28769/diff/12/?file=820431#file820431line185 I am thinking about how we should name this field. Since this is only needed for internal communication among brokers, perhaps we should name it as sth like use.new.intra.broker.wire.protocol. My next question is what happens if we have intra broker protocol changes in 2 releases. Do we want to use different names so that we can enable each change independantly? An alternative is to have the same property name and the meaning is to turn on intra broker changes introduced in this release only. The latter implies that one can't skip the upgrading of the intermediate release. So, my feeling is that probably the former will be better? Perhaps we can bring this up in the WIP discussion. I also had thoughts about it (see my reply to first comment). Lets discuss on mailing list. On Jan. 23, 2015, 1:57 a.m., Jun Rao wrote: kafka-patch-review.py, line 10 https://reviews.apache.org/r/28769/diff/12/?file=820463#file820463line10 Are the changes in this file intended? I rebased and it looks like a bunch of stuff got included by mistake. Not sure if I did something wrong or its an issue in the patch review tool. Anyway, I'll clean it up. On Jan. 23, 2015, 1:57 a.m., Jun Rao wrote: system_test/utils/kafka_system_test_utils.py, lines 389-396 https://reviews.apache.org/r/28769/diff/12/?file=820465#file820465line389 I thought protocol is specified separately, and not in broker.list? yes, it is separate. I think these properties are reused to start the brokers too (as listeners), but I'll have to double check here. - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28769/#review69281 --- On Jan. 14, 2015, 2:16 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28769/ --- (Updated Jan. 14, 2015, 2:16 a.m.) Review request for kafka. Bugs: KAFKA-1809 https://issues.apache.org/jira/browse/KAFKA-1809 Repository: kafka Description --- trivial change to add byte serializer to ProducerPerformance; patched by Jun Rao first commit of refactoring. changed topicmetadata to include brokerendpoints and fixed few unit tests fixing systest and support for
[jira] [Updated] (KAFKA-1729) add doc for Kafka-based offset management in 0.8.2
[ https://issues.apache.org/jira/browse/KAFKA-1729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-1729: -- Attachment: KAFKA-1782-doc-v3.patch Incorporated comments. One issue we have with the current linked wiki (https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka) is that it was written with Java users in mind. I will need to update it to Scala and mention that we do not support creating the v1 OffsetCommitRequest from javaapi. (KAFKA-1870). I think we should have added an additional constructor to javaapi.OffsetCommitRequest to construct v1. add doc for Kafka-based offset management in 0.8.2 -- Key: KAFKA-1729 URL: https://issues.apache.org/jira/browse/KAFKA-1729 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Joel Koshy Fix For: 0.8.2 Attachments: KAFKA-1782-doc-v1.patch, KAFKA-1782-doc-v2.patch, KAFKA-1782-doc-v3.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)
+1 (Non-binding) Verified source package, unit tests, release build, topic deletion, compaction and random testing On Mon, Jan 26, 2015 at 6:14 AM, Neha Narkhede n...@confluent.io wrote: +1 (binding) Verified keys, quick start, unit tests. On Sat, Jan 24, 2015 at 4:26 PM, Joe Stein joe.st...@stealth.ly wrote: That makes sense, thanks! On Sat, Jan 24, 2015 at 7:00 PM, Jay Kreps jay.kr...@gmail.com wrote: But I think the flaw in trying to guess what kind of serializer they will use is when we get it wrong. Basically let's say we guess String. Say 30% of the time we will be right and we will save the two configuration lines. 70% of the time we will be wrong and the user gets a super cryptic ClassCastException: xyz cannot be cast to [B (because [B is how java chooses to display the byte array class just to up the pain), then they figure out how to subscribe to our mailing list and email us the cryptic exception, then we explain about how we helpfully set these properties for them to save them time. :-) https://www.google.com/?gws_rd=ssl#q=kafka+classcastexception+%22%5BB%22 I think basically we did this experiment with the old clients and the conclusion is that serialization is something you basically have to think about to use Kafka and trying to guess just makes things worse. -Jay On Sat, Jan 24, 2015 at 2:51 PM, Joe Stein joe.st...@stealth.ly wrote: Maybe. I think the StringSerialzer could look more like a typical type of message. Instead of encoding being a property it would be more typically just written in the bytes. On Sat, Jan 24, 2015 at 12:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I don't think so--see if you buy my explanation. We previously defaulted to the byte array serializer and it was a source of unending frustration and confusion. Since it wasn't a required config people just went along plugging in whatever objects they had, and thinking that changing the parametric types would somehow help. Then they would get a class case exception and assume our stuff was somehow busted, not realizing we had helpfully configured a type different from what they were passing in under the covers. So I think it is actually good for people to think: how am I serializing my data, and getting that exception will make them ask that question right? -Jay On Fri, Jan 23, 2015 at 9:06 PM, Joe Stein joe.st...@stealth.ly wrote: Should value.serializer in the new java producer be defaulted to Array[Byte] ? I was working on testing some upgrade paths and got this ! return exception in callback when buffer cannot accept message ConfigException: Missing required configuration value.serializer which has no default value. (ConfigDef.java:124) org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) org.apache.kafka.common.config.AbstractConfig.init(AbstractConfig.java:48) org.apache.kafka.clients.producer.ProducerConfig.init(ProducerConfig.java:235) org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:129) ly.stealth.testing.BaseSpec$class.createNewKafkaProducer(BaseSpec.scala:42) ly.stealth.testing.KafkaSpec.createNewKafkaProducer(KafkaSpec.scala:36) ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:175) ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:170) On Fri, Jan 23, 2015 at 5:55 PM, Jun Rao j...@confluent.io wrote: This is a reminder that the deadline for the vote is this Monday, Jan 26, 7pm PT. Thanks, Jun On Wed, Jan 21, 2015 at 8:28 AM, Jun Rao j...@confluent.io wrote: This is the second candidate for release of Apache Kafka 0.8.2.0. There has been some changes since the 0.8.2 beta release, especially in the new java producer api and jmx mbean names. It would be great if people can test this out thoroughly. Release Notes for the 0.8.2.0 release https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html *** Please download, test and vote by Monday, Jan 26h, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc
[jira] [Commented] (KAFKA-1792) change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments
[ https://issues.apache.org/jira/browse/KAFKA-1792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292368#comment-14292368 ] Joe Stein commented on KAFKA-1792: -- [~Dmitry Pekar] can you writeup a design for what you have already coded and post it to https://cwiki.apache.org/confluence/display/KAFKA/KIP-6+-+New+reassignment+partition+logic+for+re-balancing please. From there we can move discussions to the mailing list. change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments - Key: KAFKA-1792 URL: https://issues.apache.org/jira/browse/KAFKA-1792 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 Attachments: KAFKA-1792.patch, KAFKA-1792_2014-12-03_19:24:56.patch, KAFKA-1792_2014-12-08_13:42:43.patch, KAFKA-1792_2014-12-19_16:48:12.patch, KAFKA-1792_2015-01-14_12:54:52.patch, generate_alg_tests.txt, rebalance_use_cases.txt Current implementation produces fair replica distribution between specified list of brokers. Unfortunately, it doesn't take into account current replica assignment. So if we have, for instance, 3 brokers id=[0..2] and are going to add fourth broker id=3, generate will create an assignment config which will redistribute replicas fairly across brokers [0..3] in the same way as those partitions were created from scratch. It will not take into consideration current replica assignment and accordingly will not try to minimize number of replica moves between brokers. As proposed by [~charmalloc] this should be improved. New output of improved --generate algorithm should suite following requirements: - fairness of replica distribution - every broker will have R or R+1 replicas assigned; - minimum of reassignments - number of replica moves between brokers will be minimal; Example. Consider following replica distribution per brokers [0..3] (we just added brokers 2 and 3): - broker - 0, 1, 2, 3 - replicas - 7, 6, 0, 0 The new algorithm will produce following assignment: - broker - 0, 1, 2, 3 - replicas - 4, 3, 3, 3 - moves - -3, -3, +3, +3 It will be fair and number of moves will be 6, which is minimal for specified initial distribution. The scope of this issue is: - design an algorithm matching the above requirements; - implement this algorithm and unit tests; - test it manually using different initial assignments; -- This message was sent by Atlassian JIRA (v6.3.4#6332)