Re: Changing the Replication Factor
Hi, re: http://grokbase.com/t/kafka/users/13c2rdc2yj/changing-the-replication-factor Is it possible to change the replication factor now or is it still in the pipeline? Thanks, Sagar
Re: Changing the Replication Factor
Hi Sagar, Its not there in release versions or trunk . Here is the jira for it https://issues.apache.org/jira/browse/KAFKA-1543. -Harsha On Thu, Jul 31, 2014, at 05:22 AM, Sagar Khanna wrote: Hi, re: http://grokbase.com/t/kafka/users/13c2rdc2yj/changing-the-replication-factor Is it possible to change the replication factor now or is it still in the pipeline? Thanks, Sagar
Re: Changing the Replication Factor
It is possible to change the replication factor by using the partition reassignment tool: http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor but it requires a manually edited JSON file, KAFKA-1543 tries to make it easier with a separate command tool. Guozhang On Thu, Jul 31, 2014 at 7:55 AM, Harsha ka...@harsha.io wrote: Hi Sagar, Its not there in release versions or trunk . Here is the jira for it https://issues.apache.org/jira/browse/KAFKA-1543. -Harsha On Thu, Jul 31, 2014, at 05:22 AM, Sagar Khanna wrote: Hi, re: http://grokbase.com/t/kafka/users/13c2rdc2yj/changing-the-replication-factor Is it possible to change the replication factor now or is it still in the pipeline? Thanks, Sagar -- -- Guozhang
Re: Most common kafka client comsumer implementations?
Hi Jim, Whether to use high level or simple consumer depends on your use case. If you need to manually manage partition assignments among your consumers, or you need to commit your offsets elsewhere than ZK, or you do not want auto rebalancing of consumers upon failures etc, you will use simple consumers; otherwise you use high level consumer. From your description of pulling a batch of messages it seems you are currently using the simple consumer. Suppose you are using the high level consumer, to achieve at-lease-once basically you can do sth like: message = consumer.iter.next() process(message) consumer.commit() which is effectively the same as option 2 for using a simple consumer. Of course, doing so has a heavy overhead of one-commit-per-message, you can also do option 1, by the cost of duplicates, which is tolerable for at-least-once. Guozhang On Wed, Jul 30, 2014 at 8:25 PM, Jim jimi...@gmail.com wrote: Curious on a couple questions... Are most people(are you?) using the simple consumer vs the high level consumer in production? What is the common processing paradigm for maintaining a full pipeline for kafka consumers for at-least-once messaging? E.g. you pull a batch of 1000 messages and: option 1. you wait for the slowest worker to finish working on that message, when you get back 1000 acks internally you commit your offset and pull another batch option 2. you feed your workers n msgs at a time in sequence and move your offset up as you work through your batch option 3. you maintain a full stream of 1000 messages ideally and as you get acks back from your workers you see if you can move your offset up in the stream to pull n more messages to fill up your pipeline so you're not blocked by the slowest consumer (probability wise) any good docs or articles on the subject would be great, thanks! -- -- Guozhang
Re: Issue with unit testing Kafka on 0.8.1.1 and scala 2.9.2
I wrote my java code based on kafka unit testing example code. Have you got it working on a java code? Regards, Sathya On Wed, Jul 30, 2014 at 9:07 PM, Jun Rao jun...@gmail.com wrote: Perhaps you can follow some examples in Kafka unit test? Thanks, Jun On Tue, Jul 29, 2014 at 7:32 PM, Sathyanarayanan Nagarajan sathy...@gmail.com wrote: Hi, i have been trying to run the kafka server using TestUtils.for my unit tests, while the topic gets created, i'm getting the following error error when handling request Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:1;CorrelationId:9;ClientId:id_0-host_localhost-port_9000;Leaders:id:0,host:localhost,port:9000;PartitionState:(netopic,0) - (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:2),AllReplicas:0,1) (kafka.server.KafkaApis:103) It creates topics and i can check for existence Output from my program All topics Map(new-topic - {}) topic exists true here's how i'm creating the zookeeper client and topic zkClient = new ZkClient(zookeeper.connectString(), zkSessionTimeout,zkConnectionTimeout, ZKStringSerializer$.MODULE$ ); AdminUtils.createTopic(zkClient, topic, 1, 2,AdminUtils.createTopic$default$5()); kafka version groupIdorg.apache.kafka/groupId artifactIdkafka_2.9.2/artifactId version0.8.1.1/version Let me know if anyone has faced this issue and any resolution for the same. Regards, Sathya
Re: [DISCUSS] Kafka Security Specific Features
Can we get the info on targeted release dates for 0.8.2 release and 0.9 release for our planning purposes? Thanks. Raja. On Wed, Jul 30, 2014 at 7:27 PM, Joe Stein joe.st...@stealth.ly wrote: The 0.8.2 release will not have the patch inside of it. Trunk already has a lot inside of it as a point release. The patch also doesn't account for all of the requirements that all of the stakeholders need/want for the feature. Instead of releasing something that is useful but only for some it is better to spend the time to get it right for everyone. We are going to have it in the 0.9 release (possibly also with authorization, encryption and more of the security features too) then. What we will do is keep the patch rebased against trunk and then then 0.8.2 branch (once we get to that point) so that folks can apply it to the 0.8.2 release and do a build from src. When we get to that I can create a write or something if folks find problems doing it. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Wed, Jul 30, 2014 at 7:10 PM, Calvin Lei ckp...@gmail.com wrote: yeah i just saw that. Looking forward to the prod release of 0.8.2 On Wed, Jul 30, 2014 at 11:01 AM, Rajasekar Elango rela...@salesforce.com wrote: We implemented security features on older snapshot version of 0.8 kafka. But Joe Stein's organization rebased it to latest version of kafka available at https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477 . Thanks, Raja. On Tue, Jul 29, 2014 at 10:54 PM, Calvin Lei ckp...@gmail.com wrote: Raja, Which Kafka version is your security enhancement based on? thanks, Cal On Wed, Jul 23, 2014 at 5:01 PM, Chris Neal cwn...@gmail.com wrote: Pramod, I got that same error when following the configuration from Raja's presentation earlier in this thread. If you'll notice the usage for the console_producer.sh, it is slightly different, which is also slightly different than the scala code for the ConsoleProducer. :) When I changed this: bin/kafka-console-producer.sh --broker-list n5:9092:true --topic test to this: bin/kafka-console-producer.sh --broker-list n5:9092 --secure --client.security.file config/client.security.properties --topic test I was able to push messages to the topic, although I got a WARN about the property topic not being valid, even though it is required. Also, the Producer reported this warning to me: [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context (kafka.network.security.SecureAuth$) and the broker gave me this: [2014-07-23 20:45:24,114] INFO begin ssl handshake for n5.example.com/192.168.1.144:48817//192.168.1.144:9092 (kafka.network.security.SSLSocketChannel) [2014-07-23 20:45:24,374] INFO finished ssl handshake for n5.example.com/192.168.1.144:48817//192.168.1.144:9092 (kafka.network.security.SSLSocketChannel) [2014-07-23 20:45:24,493] INFO Closing socket connection to n5.example.com/192.168.1.144. (kafka.network.Processor) [2014-07-23 20:45:24,555] INFO begin ssl handshake for n5.example.com/192.168.1.144:48818//192.168.1.144:9092 (kafka.network.security.SSLSocketChannel) [2014-07-23 20:45:24,566] INFO finished ssl handshake for n5.example.com/192.168.1.144:48818//192.168.1.144:9092 (kafka.network.security.SSLSocketChannel) It's like it did the SSL piece twice :) Subsequent puts to the topic did not exhibit this behavior though: root@n5[937]:~/kafka_2.10-0-8-2-0.1.0.0 bin/kafka-console-producer.sh --broker-list n5:9092 --secure --client.security.file config/client.security.properties --topic test [2014-07-23 20:45:17,530] WARN Property topic is not valid (kafka.utils.VerifiableProperties) 1 [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context (kafka.network.security.SecureAuth$) 2 3 4 Consuming worked with these options: root@n5[918]:~/kafka_2.10-0-8-2-0.1.0.0 bin/kafka-console-consumer.sh --topic test --zookeeper n5:2181 --from-beginning --security.config.file config/client.security.properties 1 2 3 4 ^CConsumed 5 messages I hope that helps! Chris On Tue, Jul 22, 2014 at 2:10 PM, Pramod Deshmukh dpram...@gmail.com wrote: Anyone getting this issue. Is it something related to environment or it is the code. Producer works fine when run with secure=false (no security) mode. pdeshmukh$
Re: Issues with metrics collection
Thanks for the response Jun and Otis. Jun, you're right. Only the leaders are reporting. One of the brokers is not a leader for any of the partitions. Found the following info on the documentation page: leader is the node responsible for all reads and writes for the given partition. Each node will be the leader for a *randomly* selected portion of the partitions. I've been was under an assumption that the leadership for partitions is distributed across all the brokers. *Is there a way to make all the brokers take leadership as much evenly as possible?* We're using a custom app for metrics collection. Otis, we're using kafka_2.9.2-0.8.1 Thanks. On Thu, Jul 31, 2014 at 6:26 AM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, May also want to share version of Kafka. We have SPM agents for Kafka 0.7.x and 0.8.x and we haven't seen that. Otis -- Performance Monitoring * Log Analytics * Search Analytics Solr Elasticsearch Support * http://sematext.com/ On Wed, Jul 30, 2014 at 10:12 PM, Kiran Nagasubramanian nkira...@gmail.com wrote: Hello, We're collecting the JMX metrics from the Kafka brokers. We're seeing a couple of issues. Could someone please throw some light if you've come across something similar? 1) We have a 3 broker Kafka cluster and when we're trying to collect the metrics like messages in per sec, bytes in per sec, etc. we get the values as 0 for one of the three brokers. But we get proper values for metrics like heap memory usage for all the brokers. When we restart the cluster, the same or some other broker would behave in a similar way. We're seeing similar behavior in another cluster as well. 2) We're logging the time it takes to collect the metrics. The time to collect seems to increase over time and crosses a minute in a couple of days. It's of the order of 1 or 2 seconds when start the cluster. Thanks.
Zookeeper offset
Kafka Team, In the integration environment Kafka and zookeeper are running under supervision. Once in a while when zookeeper and kafka are shut-down and started back again, the consumers are not able to read the data from the topic. I am not seeing any exceptions in the log. The consumer offset checker utility does show a lag for the consumer group. Does that mean when kafka/zookeeper are shut-down abruptly, it's possible that the zookeeper data directories are not committed with proper offset or got corrupted? I tried with new consumer groups too and with simple consumers. After that point, I am not able to retrieve the data from the topic. How do I recover the data? This is a critical problem and any help is really appreciated. Thanks! This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.
Re: Issues with metrics collection
On Thu, Jul 31, 2014 at 11:09:03AM -0700, Kiran Nagasubramanian wrote: Thanks for the response Jun and Otis. Jun, you're right. Only the leaders are reporting. One of the brokers is not a leader for any of the partitions. Found the following info on the documentation page: leader is the node responsible for all reads and writes for the given partition. Each node will be the leader for a *randomly* selected portion of the partitions. I've been was under an assumption that the leadership for partitions is distributed across all the brokers. *Is there a way to make all the brokers take leadership as much evenly as possible?* It is distributed, although it may not be an optimal distribution. i.e., as long as you have more than just a handful of topics the leader counts across the brokers should be relatively even. If you do a rolling bounce of the cluster the leader counts can get skewed in which case you can run a preferred replica leader election operation: http://kafka.apache.org/documentation.html#basic_ops_leader_balancing We're using a custom app for metrics collection. Otis, we're using kafka_2.9.2-0.8.1 Thanks. On Thu, Jul 31, 2014 at 6:26 AM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, May also want to share version of Kafka. We have SPM agents for Kafka 0.7.x and 0.8.x and we haven't seen that. Otis -- Performance Monitoring * Log Analytics * Search Analytics Solr Elasticsearch Support * http://sematext.com/ On Wed, Jul 30, 2014 at 10:12 PM, Kiran Nagasubramanian nkira...@gmail.com wrote: Hello, We're collecting the JMX metrics from the Kafka brokers. We're seeing a couple of issues. Could someone please throw some light if you've come across something similar? 1) We have a 3 broker Kafka cluster and when we're trying to collect the metrics like messages in per sec, bytes in per sec, etc. we get the values as 0 for one of the three brokers. But we get proper values for metrics like heap memory usage for all the brokers. When we restart the cluster, the same or some other broker would behave in a similar way. We're seeing similar behavior in another cluster as well. 2) We're logging the time it takes to collect the metrics. The time to collect seems to increase over time and crosses a minute in a couple of days. It's of the order of 1 or 2 seconds when start the cluster. Thanks.
Re: Unable to read from the beginning using High level consumer API
Set auto.offset.reset to smallest On Thu, Jul 31, 2014 at 08:25:35PM +, Srividhya Shanmugam wrote: Kafka Team, I am using high level consumer API as shown below to read contents from the topic. Properties props = new Properties(); props.put(zookeeper.connect ,localhost:2181); props.put zookeeper.session.timeout.ms,1); props.put(zookeeper.sync.time.ms,200); props.put(auto.commit.interval.ms ,1000); props.put(consumer.timeout.ms ,12 props.put(group.id ,TEST123); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer .createJavaConsumerConnector(config); MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(TEST, new Integer(1)); MapString, ListKafkaStreambyte[], byte[] consumerMap = consumer.createMessageStreams(topicCountMap); ListKafkaStreambyte[], byte[] streams = consumerMap.get(TEST); // now launch all the threads ThreadPoolExecutor executor = resource.getExecutor(); // now create an object to consume the messages for (final KafkaStreambyte[], byte[] stream : streams) { TestTask task = new TestTask(stream); executor.submit(task); } And the Testtask is just printing the messages. The kafka logger shows the below statement Consumer APP51_DFGHSFV1-1406836437053-9ed3b6a7 selected partitions : :0: fetched offset = -1: consumed offset = -1,:1: fetched offset = -1: consumed offset = -1 - [APP51_DFGHSFV1-1406836437053-9ed3b6a7], Even when the fetched and consumed offset displays -1, I am not getting the messages from the beginning The retention window policy is set as -log.retention.hours=168 If I produce new messages, then those messages are consumed and I can see the logged statements If I use the simple consumer API and specify the starting offset as 0, then I am able to read from the beginning Are there any settings that would enable for new consumer group to read messages from the beginning? Thanks, Srividhya This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.
Re: Apache Kafka error on windows - Couldnot find or load main class QuorumPeerMain
How did you start ZK? Thanks, Jun On Wed, Jul 30, 2014 at 10:12 PM, pradeep.si...@wipro.com wrote: Hi, Yes I tried. Added CLASSPATH environment variable with value D:\Projects\kafka_2.8.0-0.8.1.1\libs\zookeeper-3.3.4.jar and tried to start kafka. But still I am getting same error. Thanks, Pradeep Simha Technical Lead -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Thursday, July 31, 2014 10:00 AM To: users@kafka.apache.org Subject: Re: Apache Kafka error on windows - Couldnot find or load main class QuorumPeerMain Is zookeeper jar in your classpath? Thanks, Jun On Wed, Jul 30, 2014 at 6:13 AM, pradeep.si...@wipro.com wrote: Hi Team, I just downloaded Kafka 2.8.0 from Apache website, and I am trying to setup using the instructions given on the website. But when I try to start zookeper server, I am getting below error: Error: Could not find or load main class org.apache.zookeeper.server.quorum.QuorumPeerMain My environment is Windows 7 64 bit. I tried to follow below e-mail chain: [Apache Email Chain][1] . But still it's having same issue. Can anyone guide me in this? As I am very new to this and couldn't find many information on Google/Apache Kafka email chain [1]: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3CCA lupvhxcttkqfe59h_sn-6jzrscqwrgv0twsjz-hjv7ao19...@mail.gmail.com%3E Thanks, Pradeep Simha Technical Lead Cell: +91-8884382615 E-mail: pradeep.si...@wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
Re: undesirable log retention behavior
What version of Kafka are your using? Have you tried log.retention.bytes? Which ever comes first (ttl or bytes total) should do what you are looking for if I understand you right. http://kafka.apache.org/documentation.html#brokerconfigs /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Jul 31, 2014 6:52 PM, Steven Wu steve...@netflix.com.invalid wrote: it seems that log retention is purely based on last touch/modified timestamp. This is undesirable for code push in aws/cloud. e.g. let's say retention window is 24 hours. disk size is 1 TB. disk util is 60% (600GB). when new instance comes up, it will fetch log files (600GB) from peers. those log files all have newer timestamps. they won't be purged until 24 hours later. note that during the first 24 hours, new msgs (another 600GB) continue to come in. This can cause disk full problem without any intervention. With this behavior, we have to keep disk util under 50%. can last modified timestamp be inserted into the file name when rolling over log files? then kafka can check the file name for timestamp. does this make sense? Thanks, Steven
Re: undesirable log retention behavior
log.retention.bytes can somewhat help. but it is cumbersome to use because it is a per-topic config for partition limit. there was an earlier thread regarding global bytes limit. that will work well for my purpose of avoiding disk full. https://issues.apache.org/jira/browse/KAFKA-1489 On Thu, Jul 31, 2014 at 7:39 PM, Joe Stein joe.st...@stealth.ly wrote: What version of Kafka are your using? Have you tried log.retention.bytes? Which ever comes first (ttl or bytes total) should do what you are looking for if I understand you right. http://kafka.apache.org/documentation.html#brokerconfigs /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Jul 31, 2014 6:52 PM, Steven Wu steve...@netflix.com.invalid wrote: it seems that log retention is purely based on last touch/modified timestamp. This is undesirable for code push in aws/cloud. e.g. let's say retention window is 24 hours. disk size is 1 TB. disk util is 60% (600GB). when new instance comes up, it will fetch log files (600GB) from peers. those log files all have newer timestamps. they won't be purged until 24 hours later. note that during the first 24 hours, new msgs (another 600GB) continue to come in. This can cause disk full problem without any intervention. With this behavior, we have to keep disk util under 50%. can last modified timestamp be inserted into the file name when rolling over log files? then kafka can check the file name for timestamp. does this make sense? Thanks, Steven