Re: Changing the Replication Factor

2014-07-31 Thread Sagar Khanna
Hi,

 re: 
 http://grokbase.com/t/kafka/users/13c2rdc2yj/changing-the-replication-factor
Is it possible to change the replication factor now or is it still in the 
pipeline?

Thanks,
Sagar



Re: Changing the Replication Factor

2014-07-31 Thread Harsha
Hi Sagar,
   Its not there in release versions or trunk . Here is the jira
   for it https://issues.apache.org/jira/browse/KAFKA-1543.
-Harsha

On Thu, Jul 31, 2014, at 05:22 AM, Sagar Khanna wrote:
 Hi,
 
  re: 
  http://grokbase.com/t/kafka/users/13c2rdc2yj/changing-the-replication-factor
 Is it possible to change the replication factor now or is it still in the
 pipeline?
 
 Thanks,
 Sagar
 


Re: Changing the Replication Factor

2014-07-31 Thread Guozhang Wang
It is possible to change the replication factor by using the partition
reassignment tool:

http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor

but it requires a manually edited JSON file, KAFKA-1543 tries to make it
easier with a separate command tool.

Guozhang


On Thu, Jul 31, 2014 at 7:55 AM, Harsha ka...@harsha.io wrote:

 Hi Sagar,
Its not there in release versions or trunk . Here is the jira
for it https://issues.apache.org/jira/browse/KAFKA-1543.
 -Harsha

 On Thu, Jul 31, 2014, at 05:22 AM, Sagar Khanna wrote:
  Hi,
 
   re:
 http://grokbase.com/t/kafka/users/13c2rdc2yj/changing-the-replication-factor
  Is it possible to change the replication factor now or is it still in the
  pipeline?
 
  Thanks,
  Sagar
 




-- 
-- Guozhang


Re: Most common kafka client comsumer implementations?

2014-07-31 Thread Guozhang Wang
Hi Jim,

Whether to use high level or simple consumer depends on your use case. If
you need to manually manage partition assignments among your consumers, or
you need to commit your offsets elsewhere than ZK, or you do not want auto
rebalancing of consumers upon failures etc, you will use simple consumers;
otherwise you use high level consumer.

From your description of pulling a batch of messages it seems you are
currently using the simple consumer. Suppose you are using the high level
consumer, to achieve at-lease-once basically you can do sth like:

message = consumer.iter.next()
process(message)
consumer.commit()

which is effectively the same as option 2 for using a simple consumer. Of
course, doing so has a heavy overhead of one-commit-per-message, you can
also do option 1, by the cost of duplicates, which is tolerable for
at-least-once.

Guozhang


On Wed, Jul 30, 2014 at 8:25 PM, Jim jimi...@gmail.com wrote:

 Curious on a couple questions...

 Are most people(are you?) using the simple consumer vs the high level
 consumer in production?


 What is the common processing paradigm for maintaining a full pipeline for
 kafka consumers for at-least-once messaging? E.g. you pull a batch of 1000
 messages and:

 option 1.
 you wait for the slowest worker to finish working on that message, when you
 get back 1000 acks internally you commit your offset and pull another batch

 option 2.
 you feed your workers n msgs at a time in sequence and move your offset up
 as you work through your batch

 option 3.
 you maintain a full stream of 1000 messages ideally and as you get acks
 back from your workers you see if you can move your offset up in the stream
 to pull n more messages to fill up your pipeline so you're not blocked by
 the slowest consumer (probability wise)


 any good docs or articles on the subject would be great, thanks!




-- 
-- Guozhang


Re: Issue with unit testing Kafka on 0.8.1.1 and scala 2.9.2

2014-07-31 Thread Sathyanarayanan Nagarajan
I wrote my java code based on  kafka unit testing example code. Have you
got it working on a java code?

Regards,
Sathya


On Wed, Jul 30, 2014 at 9:07 PM, Jun Rao jun...@gmail.com wrote:

 Perhaps you can follow some examples in Kafka unit test?

 Thanks,

 Jun


 On Tue, Jul 29, 2014 at 7:32 PM, Sathyanarayanan Nagarajan 
 sathy...@gmail.com wrote:

  Hi,
 
   i have been trying to run the kafka server using TestUtils.for my unit
  tests, while the topic gets created, i'm getting the following error
 
   error when handling request
 
 
 Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:1;CorrelationId:9;ClientId:id_0-host_localhost-port_9000;Leaders:id:0,host:localhost,port:9000;PartitionState:(netopic,0)
  -
 
 
 (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:2),AllReplicas:0,1)
  (kafka.server.KafkaApis:103)
 
  It creates topics and i can check for existence
 
  Output from my program
  All topics Map(new-topic - {})
   topic exists true
 
  here's how i'm creating the zookeeper client and topic
 
  zkClient = new ZkClient(zookeeper.connectString(),
  zkSessionTimeout,zkConnectionTimeout,  ZKStringSerializer$.MODULE$ );
 
  AdminUtils.createTopic(zkClient, topic, 1,
  2,AdminUtils.createTopic$default$5());
 
  kafka version
 
 groupIdorg.apache.kafka/groupId
  artifactIdkafka_2.9.2/artifactId
  version0.8.1.1/version
 
  Let me know if anyone has faced this issue and any resolution for the
 same.
 
  Regards,
  Sathya
 



Re: [DISCUSS] Kafka Security Specific Features

2014-07-31 Thread Rajasekar Elango
Can we get the info on targeted release dates for 0.8.2 release and 0.9
release for our planning purposes?

Thanks.
Raja.


On Wed, Jul 30, 2014 at 7:27 PM, Joe Stein joe.st...@stealth.ly wrote:

 The 0.8.2 release will not have the patch inside of it.  Trunk already has
 a lot inside of it as a point release.  The patch also doesn't account for
 all of the requirements that all of the stakeholders need/want for the
 feature.  Instead of releasing something that is useful but only for some
 it is better to spend the time to get it right for everyone.  We are going
 to have it in the 0.9 release (possibly also with authorization, encryption
 and more of the security features too) then.

 What we will do is keep the patch rebased against trunk and then then 0.8.2
 branch (once we get to that point) so that folks can apply it to the 0.8.2
 release and do a build from src.  When we get to that I can create a write
 or something if folks find problems doing it.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /


 On Wed, Jul 30, 2014 at 7:10 PM, Calvin Lei ckp...@gmail.com wrote:

  yeah i just saw that. Looking forward to the prod release of 0.8.2
 
 
  On Wed, Jul 30, 2014 at 11:01 AM, Rajasekar Elango 
 rela...@salesforce.com
  
  wrote:
 
   We implemented security features on older snapshot version of 0.8
 kafka.
   But Joe Stein's organization rebased it to latest version of kafka
   available at https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477
 .
  
   Thanks,
   Raja.
  
  
   On Tue, Jul 29, 2014 at 10:54 PM, Calvin Lei ckp...@gmail.com wrote:
  
Raja,
   Which Kafka version is your security enhancement based on?
   
thanks,
Cal
   
   
On Wed, Jul 23, 2014 at 5:01 PM, Chris Neal cwn...@gmail.com
 wrote:
   
 Pramod,

 I got that same error when following the configuration from Raja's
 presentation earlier in this thread.  If you'll notice the usage
 for
   the
 console_producer.sh, it is slightly different, which is also
 slightly
 different than the scala code for the ConsoleProducer. :)

 When I changed this:

 bin/kafka-console-producer.sh --broker-list n5:9092:true --topic
 test

 to this:

 bin/kafka-console-producer.sh --broker-list n5:9092 --secure
 --client.security.file config/client.security.properties --topic
 test

 I was able to push messages to the topic, although I got a WARN
 about
   the
 property topic not being valid, even though it is required.

 Also, the Producer reported this warning to me:

 [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context
 (kafka.network.security.SecureAuth$)

 and the broker gave me this:
 [2014-07-23 20:45:24,114] INFO begin ssl handshake for
 n5.example.com/192.168.1.144:48817//192.168.1.144:9092
 (kafka.network.security.SSLSocketChannel)
 [2014-07-23 20:45:24,374] INFO finished ssl handshake for
 n5.example.com/192.168.1.144:48817//192.168.1.144:9092
 (kafka.network.security.SSLSocketChannel)
 [2014-07-23 20:45:24,493] INFO Closing socket connection to
 n5.example.com/192.168.1.144. (kafka.network.Processor)
 [2014-07-23 20:45:24,555] INFO begin ssl handshake for
 n5.example.com/192.168.1.144:48818//192.168.1.144:9092
 (kafka.network.security.SSLSocketChannel)
 [2014-07-23 20:45:24,566] INFO finished ssl handshake for
 n5.example.com/192.168.1.144:48818//192.168.1.144:9092
 (kafka.network.security.SSLSocketChannel)

 It's like it did the SSL piece twice :)

 Subsequent puts to the topic did not exhibit this behavior though:

 root@n5[937]:~/kafka_2.10-0-8-2-0.1.0.0
  bin/kafka-console-producer.sh
 --broker-list n5:9092 --secure --client.security.file
 config/client.security.properties --topic test
 [2014-07-23 20:45:17,530] WARN Property topic is not valid
 (kafka.utils.VerifiableProperties)
 1
 [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context
 (kafka.network.security.SecureAuth$)
 2
 3
 4

 Consuming worked with these options:

 root@n5[918]:~/kafka_2.10-0-8-2-0.1.0.0
  bin/kafka-console-consumer.sh
 --topic test --zookeeper n5:2181 --from-beginning
   --security.config.file
 config/client.security.properties
 1
 2
 3
 4
 ^CConsumed 5 messages

 I hope that helps!
 Chris


 On Tue, Jul 22, 2014 at 2:10 PM, Pramod Deshmukh 
 dpram...@gmail.com
  
 wrote:

  Anyone getting this issue. Is it something related to environment
  or
   it
 is
  the code. Producer works fine when run with secure=false (no
   security)
  mode.
 
 
  pdeshmukh$ 

Re: Issues with metrics collection

2014-07-31 Thread Kiran Nagasubramanian
Thanks for the response Jun and Otis.

Jun, you're right. Only the leaders are reporting. One of the brokers is
not a leader for any of the partitions. Found the following info on the
documentation page:

leader is the node responsible for all reads and writes for the given
partition. Each node will be the leader for a *randomly* selected portion
of the partitions.

I've been was under an assumption that the leadership for partitions is
distributed across all the brokers. *Is there a way to make all the brokers
take leadership as much evenly as possible?*

We're using a custom app for metrics collection.

Otis, we're using kafka_2.9.2-0.8.1

Thanks.



On Thu, Jul 31, 2014 at 6:26 AM, Otis Gospodnetic 
otis.gospodne...@gmail.com wrote:

 Hi,

 May also want to share version of Kafka.  We have SPM agents for Kafka
 0.7.x and 0.8.x and we haven't seen that.

 Otis
 --
 Performance Monitoring * Log Analytics * Search Analytics
 Solr  Elasticsearch Support * http://sematext.com/



 On Wed, Jul 30, 2014 at 10:12 PM, Kiran Nagasubramanian 
 nkira...@gmail.com
 wrote:

  Hello,
 
  We're collecting the JMX metrics from the Kafka brokers. We're seeing a
  couple of issues. Could someone please throw some light if you've come
  across something similar?
 
  1) We have a 3 broker Kafka cluster and when we're trying to collect the
  metrics like messages in per sec, bytes in per sec, etc. we get the
 values
  as 0 for one of the three brokers. But we get proper values for metrics
  like heap memory usage for all the brokers. When we restart the cluster,
  the same or some other broker would behave in a similar way.
 
  We're seeing similar behavior in another cluster as well.
 
  2) We're logging the time it takes to collect the metrics. The time to
  collect seems to increase over time and crosses a minute in a couple of
  days. It's of the order of 1 or 2 seconds when start the cluster.
 
  Thanks.
 



Zookeeper offset

2014-07-31 Thread Srividhya Shanmugam
Kafka Team,

In the integration environment Kafka and zookeeper are running under 
supervision. Once in a while when zookeeper and kafka are shut-down and started 
back again, the consumers are not able to read the data from the topic. I am 
not seeing any exceptions in the log. The consumer offset checker utility does 
show a lag for the consumer group.

Does that mean when kafka/zookeeper are shut-down abruptly, it's possible that 
the zookeeper data directories are not committed with proper offset or got 
corrupted? I tried with new consumer groups too and with simple consumers. 
After that point, I am not able to retrieve the data from the topic. How do I 
recover the data?

This is a critical problem and any help is really appreciated.

Thanks!

This email and any files transmitted with it are confidential, proprietary and 
intended solely for the individual or entity to whom they are addressed. If you 
have received this email in error please delete it immediately.


Re: Issues with metrics collection

2014-07-31 Thread Joel Koshy

On Thu, Jul 31, 2014 at 11:09:03AM -0700, Kiran Nagasubramanian wrote:
 Thanks for the response Jun and Otis.
 
 Jun, you're right. Only the leaders are reporting. One of the brokers is
 not a leader for any of the partitions. Found the following info on the
 documentation page:
 
 leader is the node responsible for all reads and writes for the given
 partition. Each node will be the leader for a *randomly* selected portion
 of the partitions.
 
 I've been was under an assumption that the leadership for partitions is
 distributed across all the brokers. *Is there a way to make all the brokers
 take leadership as much evenly as possible?*

It is distributed, although it may not be an optimal distribution.
i.e., as long as you have more than just a handful of topics the
leader counts across the brokers should be relatively even. If you do
a rolling bounce of the cluster the leader counts can get skewed in
which case you can run a preferred replica leader election operation:
http://kafka.apache.org/documentation.html#basic_ops_leader_balancing

 
 We're using a custom app for metrics collection.
 
 Otis, we're using kafka_2.9.2-0.8.1
 
 Thanks.
 
 
 
 On Thu, Jul 31, 2014 at 6:26 AM, Otis Gospodnetic 
 otis.gospodne...@gmail.com wrote:
 
  Hi,
 
  May also want to share version of Kafka.  We have SPM agents for Kafka
  0.7.x and 0.8.x and we haven't seen that.
 
  Otis
  --
  Performance Monitoring * Log Analytics * Search Analytics
  Solr  Elasticsearch Support * http://sematext.com/
 
 
 
  On Wed, Jul 30, 2014 at 10:12 PM, Kiran Nagasubramanian 
  nkira...@gmail.com
  wrote:
 
   Hello,
  
   We're collecting the JMX metrics from the Kafka brokers. We're seeing a
   couple of issues. Could someone please throw some light if you've come
   across something similar?
  
   1) We have a 3 broker Kafka cluster and when we're trying to collect the
   metrics like messages in per sec, bytes in per sec, etc. we get the
  values
   as 0 for one of the three brokers. But we get proper values for metrics
   like heap memory usage for all the brokers. When we restart the cluster,
   the same or some other broker would behave in a similar way.
  
   We're seeing similar behavior in another cluster as well.
  
   2) We're logging the time it takes to collect the metrics. The time to
   collect seems to increase over time and crosses a minute in a couple of
   days. It's of the order of 1 or 2 seconds when start the cluster.
  
   Thanks.
  
 



Re: Unable to read from the beginning using High level consumer API

2014-07-31 Thread Joel Koshy
Set auto.offset.reset to smallest

On Thu, Jul 31, 2014 at 08:25:35PM +, Srividhya Shanmugam wrote:
 Kafka Team,
 
 I am using high level consumer API as shown below to read contents from the 
 topic.
 
 Properties props = new Properties();
 props.put(zookeeper.connect   
 ,localhost:2181);
 props.put zookeeper.session.timeout.ms,1);
 props.put(zookeeper.sync.time.ms,200);
 props.put(auto.commit.interval.ms
 ,1000);
 props.put(consumer.timeout.ms 
 ,12
 props.put(group.id  
,TEST123);
 ConsumerConfig config = new ConsumerConfig(props);
 
 
 ConsumerConnector consumer = kafka.consumer.Consumer
 
 .createJavaConsumerConnector(config);
 
 MapString, Integer topicCountMap = new HashMapString, Integer();
 topicCountMap.put(TEST, new Integer(1));
 MapString, ListKafkaStreambyte[], byte[] consumerMap = 
 consumer.createMessageStreams(topicCountMap);
 ListKafkaStreambyte[], byte[] streams = consumerMap.get(TEST);
 
 // now launch all the threads
 ThreadPoolExecutor executor  = resource.getExecutor();
 // now create an object to consume the messages
 
 for (final KafkaStreambyte[], byte[] stream : streams) {
 TestTask task = new TestTask(stream);
 executor.submit(task);
 }
 And the Testtask is just printing the messages.
 
 The kafka logger shows the below statement
 
 Consumer APP51_DFGHSFV1-1406836437053-9ed3b6a7 selected partitions : :0: 
 fetched offset = -1: consumed offset = -1,:1: fetched offset = -1: 
 consumed offset = -1
 - [APP51_DFGHSFV1-1406836437053-9ed3b6a7],
 
 Even when the fetched and consumed offset displays -1, I am not getting the 
 messages from the beginning
 The retention window policy is set as -log.retention.hours=168
 
 If I produce new messages, then those messages are consumed and I can see the 
 logged statements
 
 If I use the simple consumer API and specify the starting offset as 0, then I 
 am able to read from the beginning
 
 Are there any settings that would enable for new consumer group to read 
 messages from the beginning?
 
 Thanks,
 Srividhya
 
 This email and any files transmitted with it are confidential, proprietary 
 and intended solely for the individual or entity to whom they are addressed. 
 If you have received this email in error please delete it immediately.



Re: Apache Kafka error on windows - Couldnot find or load main class QuorumPeerMain

2014-07-31 Thread Jun Rao
How did you start ZK?

Thanks,

Jun


On Wed, Jul 30, 2014 at 10:12 PM, pradeep.si...@wipro.com wrote:

 Hi,

 Yes I tried. Added CLASSPATH environment variable with value
 D:\Projects\kafka_2.8.0-0.8.1.1\libs\zookeeper-3.3.4.jar and tried to start
 kafka. But still I am getting same error.

 Thanks,
 Pradeep Simha
 Technical Lead

 -Original Message-
 From: Jun Rao [mailto:jun...@gmail.com]
 Sent: Thursday, July 31, 2014 10:00 AM
 To: users@kafka.apache.org
 Subject: Re: Apache Kafka error on windows - Couldnot find or load main
 class QuorumPeerMain

 Is zookeeper jar in your classpath?

 Thanks,

 Jun


 On Wed, Jul 30, 2014 at 6:13 AM, pradeep.si...@wipro.com wrote:

  Hi Team,
 
  I just downloaded Kafka 2.8.0 from Apache website, and I am trying to
  setup using the instructions given on the website. But when I try to
  start zookeper server, I am getting below error:
 
  Error: Could not find or load main class
  org.apache.zookeeper.server.quorum.QuorumPeerMain
 
  My environment is Windows 7 64 bit. I tried to follow below e-mail chain:
  [Apache Email Chain][1] . But still it's having same issue. Can anyone
  guide me in this? As I am very new to this and couldn't find many
  information on Google/Apache Kafka email chain
 
  [1]:
  http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3CCA
  lupvhxcttkqfe59h_sn-6jzrscqwrgv0twsjz-hjv7ao19...@mail.gmail.com%3E
 
  Thanks,
  Pradeep Simha
  Technical Lead
  Cell: +91-8884382615
  E-mail: pradeep.si...@wipro.com
 
 
  The information contained in this electronic message and any
  attachments to this message are intended for the exclusive use of the
  addressee(s) and may contain proprietary, confidential or privileged
  information. If you are not the intended recipient, you should not
  disseminate, distribute or copy this e-mail. Please notify the sender
  immediately and destroy all copies of this message and any attachments.
 
  WARNING: Computer viruses can be transmitted via email. The recipient
  should check this email and any attachments for the presence of viruses.
  The company accepts no liability for any damage caused by any virus
  transmitted by this email.
 
  www.wipro.com
 

 The information contained in this electronic message and any attachments
 to this message are intended for the exclusive use of the addressee(s) and
 may contain proprietary, confidential or privileged information. If you are
 not the intended recipient, you should not disseminate, distribute or copy
 this e-mail. Please notify the sender immediately and destroy all copies of
 this message and any attachments.

 WARNING: Computer viruses can be transmitted via email. The recipient
 should check this email and any attachments for the presence of viruses.
 The company accepts no liability for any damage caused by any virus
 transmitted by this email.

 www.wipro.com



Re: undesirable log retention behavior

2014-07-31 Thread Joe Stein
What version of Kafka are your using? Have you tried log.retention.bytes?
Which ever comes first (ttl or bytes total) should do what you are looking
for if I understand you right.
http://kafka.apache.org/documentation.html#brokerconfigs

/***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
/
On Jul 31, 2014 6:52 PM, Steven Wu steve...@netflix.com.invalid wrote:

 it seems that log retention is purely based on last touch/modified
 timestamp. This is undesirable for code push in aws/cloud.

 e.g. let's say retention window is 24 hours. disk size is 1 TB. disk util
 is 60% (600GB). when new instance comes up, it will fetch log files (600GB)
 from peers. those log files all have newer timestamps. they won't be purged
 until 24 hours later. note that during the first 24 hours, new msgs
 (another 600GB) continue to come in. This can cause disk full problem
 without any intervention. With this behavior, we have to keep disk util
 under 50%.

 can last modified timestamp be inserted into the file name when rolling
 over log files? then kafka can check the file name for timestamp. does this
 make sense?

 Thanks,
 Steven



Re: undesirable log retention behavior

2014-07-31 Thread Steven Wu
log.retention.bytes can somewhat help. but it is cumbersome to use because
it is a per-topic config for partition limit.

there was an earlier thread regarding global bytes limit. that will work
well for my purpose of avoiding disk full.
https://issues.apache.org/jira/browse/KAFKA-1489


On Thu, Jul 31, 2014 at 7:39 PM, Joe Stein joe.st...@stealth.ly wrote:

 What version of Kafka are your using? Have you tried log.retention.bytes?
 Which ever comes first (ttl or bytes total) should do what you are looking
 for if I understand you right.
 http://kafka.apache.org/documentation.html#brokerconfigs

 /***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop
 /
 On Jul 31, 2014 6:52 PM, Steven Wu steve...@netflix.com.invalid wrote:

  it seems that log retention is purely based on last touch/modified
  timestamp. This is undesirable for code push in aws/cloud.
 
  e.g. let's say retention window is 24 hours. disk size is 1 TB. disk util
  is 60% (600GB). when new instance comes up, it will fetch log files
 (600GB)
  from peers. those log files all have newer timestamps. they won't be
 purged
  until 24 hours later. note that during the first 24 hours, new msgs
  (another 600GB) continue to come in. This can cause disk full problem
  without any intervention. With this behavior, we have to keep disk util
  under 50%.
 
  can last modified timestamp be inserted into the file name when rolling
  over log files? then kafka can check the file name for timestamp. does
 this
  make sense?
 
  Thanks,
  Steven