Re: [DISCUSS] Kafka Security Specific Features
yeah i just saw that. Looking forward to the prod release of 0.8.2 On Wed, Jul 30, 2014 at 11:01 AM, Rajasekar Elango wrote: > We implemented security features on older snapshot version of 0.8 kafka. > But Joe Stein's organization rebased it to latest version of kafka > available at https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477. > > Thanks, > Raja. > > > On Tue, Jul 29, 2014 at 10:54 PM, Calvin Lei wrote: > > > Raja, > >Which Kafka version is your security enhancement based on? > > > > thanks, > > Cal > > > > > > On Wed, Jul 23, 2014 at 5:01 PM, Chris Neal wrote: > > > > > Pramod, > > > > > > I got that same error when following the configuration from Raja's > > > presentation earlier in this thread. If you'll notice the usage for > the > > > console_producer.sh, it is slightly different, which is also slightly > > > different than the scala code for the ConsoleProducer. :) > > > > > > When I changed this: > > > > > > bin/kafka-console-producer.sh --broker-list n5:9092:true --topic test > > > > > > to this: > > > > > > bin/kafka-console-producer.sh --broker-list n5:9092 --secure > > > --client.security.file config/client.security.properties --topic test > > > > > > I was able to push messages to the topic, although I got a WARN about > the > > > property "topic" not being valid, even though it is required. > > > > > > Also, the Producer reported this warning to me: > > > > > > [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context > > > (kafka.network.security.SecureAuth$) > > > > > > and the broker gave me this: > > > [2014-07-23 20:45:24,114] INFO begin ssl handshake for > > > n5.example.com/192.168.1.144:48817//192.168.1.144:9092 > > > (kafka.network.security.SSLSocketChannel) > > > [2014-07-23 20:45:24,374] INFO finished ssl handshake for > > > n5.example.com/192.168.1.144:48817//192.168.1.144:9092 > > > (kafka.network.security.SSLSocketChannel) > > > [2014-07-23 20:45:24,493] INFO Closing socket connection to > > > n5.example.com/192.168.1.144. (kafka.network.Processor) > > > [2014-07-23 20:45:24,555] INFO begin ssl handshake for > > > n5.example.com/192.168.1.144:48818//192.168.1.144:9092 > > > (kafka.network.security.SSLSocketChannel) > > > [2014-07-23 20:45:24,566] INFO finished ssl handshake for > > > n5.example.com/192.168.1.144:48818//192.168.1.144:9092 > > > (kafka.network.security.SSLSocketChannel) > > > > > > It's like it did the SSL piece twice :) > > > > > > Subsequent puts to the topic did not exhibit this behavior though: > > > > > > root@n5[937]:~/kafka_2.10-0-8-2-0.1.0.0> bin/kafka-console-producer.sh > > > --broker-list n5:9092 --secure --client.security.file > > > config/client.security.properties --topic test > > > [2014-07-23 20:45:17,530] WARN Property topic is not valid > > > (kafka.utils.VerifiableProperties) > > > 1 > > > [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context > > > (kafka.network.security.SecureAuth$) > > > 2 > > > 3 > > > 4 > > > > > > Consuming worked with these options: > > > > > > root@n5[918]:~/kafka_2.10-0-8-2-0.1.0.0> bin/kafka-console-consumer.sh > > > --topic test --zookeeper n5:2181 --from-beginning > --security.config.file > > > config/client.security.properties > > > 1 > > > 2 > > > 3 > > > 4 > > > ^CConsumed 5 messages > > > > > > I hope that helps! > > > Chris > > > > > > > > > On Tue, Jul 22, 2014 at 2:10 PM, Pramod Deshmukh > > > wrote: > > > > > > > Anyone getting this issue. Is it something related to environment or > it > > > is > > > > the code. Producer works fine when run with secure=false (no > security) > > > > mode. > > > > > > > > > > > > pdeshmukh$ bin/kafka-console-producer.sh --broker-list > > > localhost:9092:true > > > > --topic secureTopic > > > > > > > > [2014-07-18 13:12:29,817] WARN Property topic is not valid > > > > (kafka.utils.VerifiableProperties) > > > > > > > > Hare Krishna > > > > > > > > [2014-07-18 13:12:45,256] WARN Fetching topic metadata with > correlation > > >
Re: [DISCUSS] Kafka Security Specific Features
Raja, Which Kafka version is your security enhancement based on? thanks, Cal On Wed, Jul 23, 2014 at 5:01 PM, Chris Neal wrote: > Pramod, > > I got that same error when following the configuration from Raja's > presentation earlier in this thread. If you'll notice the usage for the > console_producer.sh, it is slightly different, which is also slightly > different than the scala code for the ConsoleProducer. :) > > When I changed this: > > bin/kafka-console-producer.sh --broker-list n5:9092:true --topic test > > to this: > > bin/kafka-console-producer.sh --broker-list n5:9092 --secure > --client.security.file config/client.security.properties --topic test > > I was able to push messages to the topic, although I got a WARN about the > property "topic" not being valid, even though it is required. > > Also, the Producer reported this warning to me: > > [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context > (kafka.network.security.SecureAuth$) > > and the broker gave me this: > [2014-07-23 20:45:24,114] INFO begin ssl handshake for > n5.example.com/192.168.1.144:48817//192.168.1.144:9092 > (kafka.network.security.SSLSocketChannel) > [2014-07-23 20:45:24,374] INFO finished ssl handshake for > n5.example.com/192.168.1.144:48817//192.168.1.144:9092 > (kafka.network.security.SSLSocketChannel) > [2014-07-23 20:45:24,493] INFO Closing socket connection to > n5.example.com/192.168.1.144. (kafka.network.Processor) > [2014-07-23 20:45:24,555] INFO begin ssl handshake for > n5.example.com/192.168.1.144:48818//192.168.1.144:9092 > (kafka.network.security.SSLSocketChannel) > [2014-07-23 20:45:24,566] INFO finished ssl handshake for > n5.example.com/192.168.1.144:48818//192.168.1.144:9092 > (kafka.network.security.SSLSocketChannel) > > It's like it did the SSL piece twice :) > > Subsequent puts to the topic did not exhibit this behavior though: > > root@n5[937]:~/kafka_2.10-0-8-2-0.1.0.0> bin/kafka-console-producer.sh > --broker-list n5:9092 --secure --client.security.file > config/client.security.properties --topic test > [2014-07-23 20:45:17,530] WARN Property topic is not valid > (kafka.utils.VerifiableProperties) > 1 > [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context > (kafka.network.security.SecureAuth$) > 2 > 3 > 4 > > Consuming worked with these options: > > root@n5[918]:~/kafka_2.10-0-8-2-0.1.0.0> bin/kafka-console-consumer.sh > --topic test --zookeeper n5:2181 --from-beginning --security.config.file > config/client.security.properties > 1 > 2 > 3 > 4 > ^CConsumed 5 messages > > I hope that helps! > Chris > > > On Tue, Jul 22, 2014 at 2:10 PM, Pramod Deshmukh > wrote: > > > Anyone getting this issue. Is it something related to environment or it > is > > the code. Producer works fine when run with secure=false (no security) > > mode. > > > > > > pdeshmukh$ bin/kafka-console-producer.sh --broker-list > localhost:9092:true > > --topic secureTopic > > > > [2014-07-18 13:12:29,817] WARN Property topic is not valid > > (kafka.utils.VerifiableProperties) > > > > Hare Krishna > > > > [2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation > id > > 0 for topics [Set(secureTopic)] from broker > > [id:0,host:localhost,port:9092,secure:true] failed > > (kafka.client.ClientUtils$) > > > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > > > at kafka.utils.Utils$.read(Utils.scala:381) > > > > at > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > > > at > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102) > > > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79) > > > > at > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:117) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > > > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > > > > at > > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > > > > at > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > > > > at > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > > > > at scala.collecti
Changing partition broker leader
Hi, My cluster has 5 topics and each has 1 partition. Each topic has 4 replicas. For some reasons, the broker leaders of all topics are pointing the same box. Is there a way for me to shuffle the leaders a bit to avoid all pointing to the same box? thanks in advance, Cal
Re: PartitionOffsetRequestInfo only supports -1 and -2?
Jun, Regarding the arbitrary timestamp, I found that if I use the GetOffsetShell<https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/tools/GetOffsetShell.scala>, it works. But if I use the Java API ( requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(SOME_LONG_TIMESTAMP, 1));), it only returns an empty long[]. I looked at the source of GetOffsetShell, the way the offset is retrieved is slightly different from using the java api. Could that be a bug? Scala API: val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets( topicAndPartition).offsets Java API: consumer.getOffsetsBefore(request).offsets(topic, partition); thanks, Cal On Tue, Jan 7, 2014 at 6:49 PM, Jun Rao wrote: > Joe, > > Yes, this is a bit confusing. We have 2 offset related requests. One is > OffsetFetchRequest and another is OffsetRequest. The former is used to get > the offset within a consumer group and the latter is to get the offset > before a particular timestamp. And there is of course the FetchRequest, > which fetches messages on a given offset. > > Thanks, > > Jun > > > On Tue, Jan 7, 2014 at 9:00 AM, Joe Stein wrote: > > > hmmm, that should be the "offset to fetch from", not sure why the > variable > > is called "time" =8^/ unless I am looking at something else from what > you > > are asking but this I think (another dev please confirm or explain why it > > is time). > > > > case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int) > > > > should be > > > > case class PartitionOffsetRequestInfo(offsetToFetchFrom: Long, > > maxNumOffsets: Int) > > > > which matches the Fetch API we have documented > > > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > > > > > > /*** > > Joe Stein > > Founder, Principal Consultant > > Big Data Open Source Security LLC > > http://www.stealth.ly > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > / > > > > > > On Tue, Jan 7, 2014 at 11:31 AM, Calvin Lei wrote: > > > > > Does it support an long timestamp? Tried it and the resulting offset > > > response contains an empty long array. > > > > > > > > > > > > > > > > > > -- > > > Sent from Gmail Mobile > > > > > >
Re: PartitionOffsetRequestInfo only supports -1 and -2?
Jun, Arbitrary timestamp does not work for me. It would return an empty long array. Cal On Tuesday, January 7, 2014, Jun Rao wrote: > You can provide an arbitrary timestamp. However, the answer is not going to > be exact. The broker only returns the offset of the first message in a log > segment file whose last modified time is less than the provided timestamp. > > Thanks, > > Jun > > > On Tue, Jan 7, 2014 at 8:31 AM, Calvin Lei > > wrote: > > > Does it support an long timestamp? Tried it and the resulting offset > > response contains an empty long array. > > > > > > > > > > > > -- > > Sent from Gmail Mobile > > > -- Sent from Gmail Mobile
PartitionOffsetRequestInfo only supports -1 and -2?
Does it support an long timestamp? Tried it and the resulting offset response contains an empty long array. -- Sent from Gmail Mobile
Re: Getting None.get exception when running kafka-reassign-partitions.sh
Thanks Jun! The sample json returned from the -help of the script is out of date. On Sun, Oct 13, 2013 at 5:10 PM, Jun Rao wrote: > Are you trying to feed the json file to the --manual-assignment-json-file > option? If so, you need to specify the replicas (see the description of the > option for details). > > Thanks, > > Jun > > > On Sun, Oct 13, 2013 at 8:02 AM, Calvin Lei wrote: > > > Yes. > > > > Partitions reassignment failed due to None.get > > java.util.NoSuchElementException: None.get > >at scala.None$.get (Option.scala:185) > >at scala.None$.get (Option.scala:183) > >at kafka.utils.ZkUtils$$anonfun$parsePartitionReassignmentData$1.apply > > (ZkUtils.scala:571) > >at kafka.utils.ZkUtils$$anonfun$parsePartitionReassignmentData$1.apply > > (ZkUtils.scala:568) > >at scala.collections.LinearSeqOptimied$class.foreach > > (LinearSeqOptimized.scala:61) > >at scala.collection.immutable.List.foreach (List.scala:45) > >at kafka.utils.ZkUtils$.parsePartitionReassignmentData > > (ZkUtils.scala:568) > >at kafka.admin.ReassignPartitionsCommand$.main > > (ReassignPartitionsCommand.scala:58) > >at kafka.admin.ReassignPartitionsCommand.main > > (ReassignPartitionsCommand.scala) > > > > thanks, > > Cal > > > > > > On Sun, Oct 13, 2013 at 2:54 AM, Neha Narkhede > >wrote: > > > > > Can you please send the full stack trace? > > > > > > Thanks, > > > Neha > > > On Oct 12, 2013 1:56 PM, "Calvin Lei" wrote: > > > > > > > Checked the json file and everything seems normal. When I run the > > script > > > > and I got the error: > > > > > > > > Partitions reassignment failed due to None.get > > > > java.util.NoSuchElementException: None.get > > > >at scala.None$.get (Option.scala:185) > > > > > > > > my json file: > > > > > > > > {partitions": > > > >[{"topic": "topicA", "partition": 0}] > > > > } > > > > > > > > > > > > thanks in advance, > > > > Cal > > > > > > > > > >
Re: Getting None.get exception when running kafka-reassign-partitions.sh
Yes. Partitions reassignment failed due to None.get java.util.NoSuchElementException: None.get at scala.None$.get (Option.scala:185) at scala.None$.get (Option.scala:183) at kafka.utils.ZkUtils$$anonfun$parsePartitionReassignmentData$1.apply (ZkUtils.scala:571) at kafka.utils.ZkUtils$$anonfun$parsePartitionReassignmentData$1.apply (ZkUtils.scala:568) at scala.collections.LinearSeqOptimied$class.foreach (LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach (List.scala:45) at kafka.utils.ZkUtils$.parsePartitionReassignmentData (ZkUtils.scala:568) at kafka.admin.ReassignPartitionsCommand$.main (ReassignPartitionsCommand.scala:58) at kafka.admin.ReassignPartitionsCommand.main (ReassignPartitionsCommand.scala) thanks, Cal On Sun, Oct 13, 2013 at 2:54 AM, Neha Narkhede wrote: > Can you please send the full stack trace? > > Thanks, > Neha > On Oct 12, 2013 1:56 PM, "Calvin Lei" wrote: > > > Checked the json file and everything seems normal. When I run the script > > and I got the error: > > > > Partitions reassignment failed due to None.get > > java.util.NoSuchElementException: None.get > >at scala.None$.get (Option.scala:185) > > > > my json file: > > > > {partitions": > >[{"topic": "topicA", "partition": 0}] > > } > > > > > > thanks in advance, > > Cal > > >
Getting None.get exception when running kafka-reassign-partitions.sh
Checked the json file and everything seems normal. When I run the script and I got the error: Partitions reassignment failed due to None.get java.util.NoSuchElementException: None.get at scala.None$.get (Option.scala:185) my json file: {partitions": [{"topic": "topicA", "partition": 0}] } thanks in advance, Cal
Re: Recovering a broker that falls out of the isr
Thanks all. In this case, restarting the broker fixed it. It was due to an usual high volume of messages in one topic. On Sep 27, 2013, at 12:16 AM, Neha Narkhede wrote: > I think you may be asking a slightly different question. If a broker falls > out of ISR and does not rejoin the ISR, it may point to some bottleneck > (e.g. local IO), fewer partitions for large topics or some fatal error > causing the ReplicaFetcherThread to die. Just restarting the broker without > knowing the root cause might not always make the broker rejoin ISR. > > Thanks, > Neha > On Sep 26, 2013 12:48 PM, "Calvin Lei" wrote: >> >> Is restarting the broker the only way to put a broker back to the isr? >> >> Thanks >> Cal
Recovering a broker that falls out of the isr
Is restarting the broker the only way to put a broker back to the isr? Thanks Cal
Re: Securing kafka
That's sounds very interesting. Looking forward to it! On Aug 29, 2013 11:23 PM, "Rajasekar Elango" wrote: > We have made changes to kafka code to support certificate based mutual SSL > authentication. So the clients and broker will exchange trusted > certificates for successful communication. This provides both > authentication and ssl encryption. Planning to contribute that code back to > kafka soon. > > Thanks, > Raja. > > > On Thu, Aug 29, 2013 at 11:16 PM, Joe Stein wrote: > > > One use case I have been discussing recently with a few clients is > > verifying the digital signature of a message as part of the acceptance > > criteria of it being committed to the log and/or when it is consumed. > > > > I would be very interested in discussing different scenarios such as > Kafka > > as a service, privacy at rest as well as authorization and authentication > > (if required). > > > > Hit me up > > > > /*** > > Joe Stein > > Founder, Principal Consultant > > Big Data Open Source Security LLC > > http://www.stealth.ly > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > / > > > > > > On Thu, Aug 29, 2013 at 8:13 PM, Jay Kreps wrote: > > > > > +1 > > > > > > We don't have any application-level security at this time so the answer > > is > > > whatever you can do at the network/system level. > > > > > > -Jay > > > > > > > > > On Thu, Aug 29, 2013 at 10:09 AM, Benjamin Black wrote: > > > > > > > IP filters on the hosts. > > > > On Aug 29, 2013 10:03 AM, "Calvin Lei" wrote: > > > > > > > > > Is there a way to stop a malicious user to connect directly to a > > kafka > > > > > broker and send any messages? Could we have the brokers to accept a > > > > message > > > > > to a list of know IPs? > > > > > > > > > > > > > > > > > > -- > Thanks, > Raja. >
Securing kafka
Is there a way to stop a malicious user to connect directly to a kafka broker and send any messages? Could we have the brokers to accept a message to a list of know IPs?
Re: Recommended log level in prod environment.
Thanks for the confirmation Jun. On Jul 23, 2013 12:54 AM, "Jun Rao" wrote: > Yes, the kafka-request log logs every request (in TRACE). It's mostly for > debugging purpose. Other than that, there is no harm to turn it off. > > Thanks, > > Jun > > > On Mon, Jul 22, 2013 at 7:59 PM, Calvin Lei wrote: > > > nah. We just changed it to INFO and will monitor the log. We have GBs of > > logs when it was at trace level. the kafka-request log was going crazy. > > > > > > On Jul 22, 2013, at 10:54 PM, Jay Kreps wrote: > > > > > We run at info too except when debugging stuff. Are you saying that > info > > is > > > too verbose? > > > > > > -Jay > > > > > > > > > On Mon, Jul 22, 2013 at 6:43 PM, Calvin Lei wrote: > > > > > >> The beta release comes with mostly trace level logging. Is this > > >> recommended? I notice our cluster produce way too many logs. I set all > > the > > >> level to info currently. > > >> > > > > >
Re: Recommended log level in prod environment.
nah. We just changed it to INFO and will monitor the log. We have GBs of logs when it was at trace level. the kafka-request log was going crazy. On Jul 22, 2013, at 10:54 PM, Jay Kreps wrote: > We run at info too except when debugging stuff. Are you saying that info is > too verbose? > > -Jay > > > On Mon, Jul 22, 2013 at 6:43 PM, Calvin Lei wrote: > >> The beta release comes with mostly trace level logging. Is this >> recommended? I notice our cluster produce way too many logs. I set all the >> level to info currently. >>
Recommended log level in prod environment.
The beta release comes with mostly trace level logging. Is this recommended? I notice our cluster produce way too many logs. I set all the level to info currently.
Re: Questions regarding broker
thanks Joel for looking into it. I will try to reproduce it. I don't think the second zookeeper is needed because i ran into it the first time just by shutting down the topic leaders. Cal On Tue, Jul 16, 2013 at 2:38 AM, Joel Koshy wrote: > Hey Calvin, > > I apologize for not being able to get to this sooner. I don't think I > can reproduce the full scenario exactly as I don't have exclusive > access to so many machines, but I tried it locally and couldn't > reproduce it. Any chance you can reproduce it with a smaller > deployment? Is step 6 required? Would you mind pasting the full stack > trace that you saw? > > Thanks, > > Joel > > > > > On Wed, Jul 10, 2013 at 11:10 PM, Joel Koshy wrote: > > Ok thanks - I'll go through this tomorrow. > > > > Joel > > > > On Wed, Jul 10, 2013 at 9:14 PM, Calvin Lei wrote: > >> Joel, > >>So i was able to reproduce the issue that I experienced. Please see > the > >> steps below. > >> 1. Set up a 3-zookeeper and 6-broker cluster. Setup one topic with 2 > >> partitions, with replication factor set to 3. > >> 2. Setup and run the console consumer, consuming messages from that > topic. > >> 3. Produce a few messages to confirm the consumer is working. > >> 4. Stop the consumer. > >> 5. Shutdown (uncontrolled) the lead broker in one of the partition. > >> 6. Shutdown one of the zookeeper. > >> 7. Run the list topic script to confirm a new leader has been elected > >> 8. Bring up the console consumer again. > >> 9. Console consumer won't start because of error in rebalancing (when > >> fetching topic metadata). > >> Error: Java.util.NoSuchElementException: Key Not Found (5). > >> Trace: Client.Util.Scala:67 > >> > >> Where broker 5 was the lead broker I shut down. I am using 0.8 beta. > >> > >> thanks, > >> Cal > >> > >> > >> On Tue, Jul 9, 2013 at 11:20 PM, Calvin Lei wrote: > >> > >>> I will try to reproduce it. it was sporadic. My set up was a topic > with 1 > >>> partition and replication factor = 3. > >>> If i kill the console producer and then shut down the leader broker, a > new > >>> leader is elected. If I again kill the new lead, I dont see the last > broker > >>> be elected as a leader. Then i tried starting the console producer, i > >>> started seeing errors. > >>> > >>> > >>> > >>> > >>> On Tue, Jul 9, 2013 at 6:14 PM, Joel Koshy > wrote: > >>> > >>>> Not really - if you shutdown a leader broker (and assuming your > >>>> replication factor is > 1) then the other assigned replica will be > >>>> elected as the new leader. The producer would then look up metadata, > >>>> find the new leader and send requests to it. What do you see in the > >>>> logs? > >>>> > >>>> Joel > >>>> > >>>> On Tue, Jul 9, 2013 at 1:44 PM, Calvin Lei wrote: > >>>> > Thanks you have me enough pointers to dig deeper. And I tested the > fault > >>>> > tolerance by shutting down brokers randomly. > >>>> > > >>>> > What I noticed is if I shutdown brokers while my producer and > consumer > >>>> are > >>>> > still running, they recover fine. However, if I shutdown a lead > broker > >>>> > without a running producer, I can't seem to start the producer > >>>> afterwards > >>>> > without restarting the previous lead broker. Is this expected? > >>>> > On Jul 9, 2013 10:28 AM, "Joel Koshy" wrote: > >>>> > > >>>> >> For 1 I forgot to add - there is an admin tool to reassign replicas > >>>> but it > >>>> >> would take longer than leader failover. > >>>> >> > >>>> >> Joel > >>>> >> > >>>> >> On Tuesday, July 9, 2013, Joel Koshy wrote: > >>>> >> > >>>> >> > 1 - no, unless broker4 is not the preferred leader. (The > preferred > >>>> >> > leader is the first broker in the assigned replica list). If a > >>>> >> > non-preferred replica is the current leader you can run the > >>>> >> > PreferredReplicaLeaderElection admin command to move the leader. > >>>> >> > 2 - Th
Re: Questions on mirror maker
Joel, For #1, I meant multiple consumer configs. Please excuse me for the typo. For #2, turns out i started the mirror before i brought up the kafka cluster, hence all the messages failed to send to the remote cluster. thanks, Cal On Tue, Jul 16, 2013 at 3:16 AM, Joel Koshy wrote: > Calvin, > > For (1) can you clarify what you mean by "multiplied" consumer configs? > For (2) the mirror-maker actually uses the high level consumer. > > Thanks, > > Joel > > On Sun, Jul 14, 2013 at 7:43 AM, Calvin Lei wrote: > > Hi all, > > I have two questions regarding setting up mirror maker for our cross > cluster replication (DC1 to DC2, for instance) > > 1. In what use case you would want to specify multiplied consumer > configs? > > 2. It seems like the consumer inside the mirror is a SimpleConsumer. > Is it possible to switch it to HghLevelConsumer? Assuming our DC2 is down, > I would shut down the mirror until DC2 is back up. Once it is up, I want > the mirror to pick up when it is left off and start replicating messages > that has not been delivered to DC2. By using a Highlevel consumer, that > kind of behavior is built in because the offset is being saved in the DC1 > zookeeper. am i correct? > > > > thanks, > > Cal >
Questions on mirror maker
Hi all, I have two questions regarding setting up mirror maker for our cross cluster replication (DC1 to DC2, for instance) 1. In what use case you would want to specify multiplied consumer configs? 2. It seems like the consumer inside the mirror is a SimpleConsumer. Is it possible to switch it to HghLevelConsumer? Assuming our DC2 is down, I would shut down the mirror until DC2 is back up. Once it is up, I want the mirror to pick up when it is left off and start replicating messages that has not been delivered to DC2. By using a Highlevel consumer, that kind of behavior is built in because the offset is being saved in the DC1 zookeeper. am i correct? thanks, Cal
Re: Combating network latency best practice
Thanks Jay. We will still suffer from network latency if we use remote write. We probably will explore more on the idea of having local cluster and mirror messages across the DC. thanks, Cal On Wed, Jul 10, 2013 at 12:04 PM, Jay Kreps wrote: > To publish to a remote data center just configure the producers with the > host/port of the remote datacenter. To ensure good throughput you may want > to tune the socket send and receive buffers on the client and server to > avoid small roundtrips: > http://en.wikipedia.org/wiki/Bandwidth-delay_product > > -Jay > > > > On Wed, Jul 10, 2013 at 6:57 AM, Calvin Lei wrote: > > > Thanks Jay. I thought of using the worldview architecture you suggested. > > But since our consumers are also globally deployed, which means any new > > messages arrive the worldview needs to be replicated back to the local > DCs, > > making the topology a bit complicated. > > > > Would you please elaborate on the remote write? How do I achieve it? > > On Jul 10, 2013 1:08 AM, "Jay Kreps" wrote: > > > > > Ah, good question we really should add this to the documentation. > > > > > > We run a cluster per data center. All writes always go to the > data-center > > > local cluster. Replication to aggregate clusters that provide the > "world > > > wide" view is done with mirror maker. > > > > > > It is also fine to write to or read from a kafka cluster in a remote > > colo, > > > though obviously you have to think about the case where the cluster is > > not > > > accessible due to network access. > > > > > > Kafka is not designed to run a single cluster spread across > > geographically > > > disparate colos and you would see a few problems in that scenario. The > > > first is that, as you noted, the latency will be terrible as it will > > block > > > on the slowest response from all datacenters. This could be avoided if > > you > > > lowered the request.required.acks to 1, but that would impact > durability > > > guarantees. The second problem is that Kafka will not remain available > in > > > the presence of network partitions so if the inter-datacenter link > failed > > > one datacenter would lose its cluster. Finally we have not done > anything > > to > > > attempt to optimize partition placement by colo so you would not > actually > > > have redundancy between colos because we would often place all replicas > > in > > > a single colo. > > > > > > -Jay > > > > > > > > > On Tue, Jul 9, 2013 at 9:34 PM, Calvin Lei wrote: > > > > > > > Folks, > > > >Our application has multiple producers globally (region1, region2, > > > > region3). If we group all the brokers together into one cluster, we > > > notice > > > > an obvious network latency if a broker replicates regionally with the > > > > request.required.acks = -1. > > > > > > > >Is there any best practice for combating the network latency in > the > > > > deployment topology? Should we segregate the brokers regionally (one > > > kafka > > > > cluster per region) and set up MirrorMaker between the regions > (region1 > > > > <--> region2, region2 <--> region3, region1 <--> region3), total of 6 > > > > mirror makes? > > > > > > > > > > > > Thanks. > > > > > > > > > >
Re: Questions regarding broker
Joel, So i was able to reproduce the issue that I experienced. Please see the steps below. 1. Set up a 3-zookeeper and 6-broker cluster. Setup one topic with 2 partitions, with replication factor set to 3. 2. Setup and run the console consumer, consuming messages from that topic. 3. Produce a few messages to confirm the consumer is working. 4. Stop the consumer. 5. Shutdown (uncontrolled) the lead broker in one of the partition. 6. Shutdown one of the zookeeper. 7. Run the list topic script to confirm a new leader has been elected 8. Bring up the console consumer again. 9. Console consumer won't start because of error in rebalancing (when fetching topic metadata). Error: Java.util.NoSuchElementException: Key Not Found (5). Trace: Client.Util.Scala:67 Where broker 5 was the lead broker I shut down. I am using 0.8 beta. thanks, Cal On Tue, Jul 9, 2013 at 11:20 PM, Calvin Lei wrote: > I will try to reproduce it. it was sporadic. My set up was a topic with 1 > partition and replication factor = 3. > If i kill the console producer and then shut down the leader broker, a new > leader is elected. If I again kill the new lead, I dont see the last broker > be elected as a leader. Then i tried starting the console producer, i > started seeing errors. > > > > > On Tue, Jul 9, 2013 at 6:14 PM, Joel Koshy wrote: > >> Not really - if you shutdown a leader broker (and assuming your >> replication factor is > 1) then the other assigned replica will be >> elected as the new leader. The producer would then look up metadata, >> find the new leader and send requests to it. What do you see in the >> logs? >> >> Joel >> >> On Tue, Jul 9, 2013 at 1:44 PM, Calvin Lei wrote: >> > Thanks you have me enough pointers to dig deeper. And I tested the fault >> > tolerance by shutting down brokers randomly. >> > >> > What I noticed is if I shutdown brokers while my producer and consumer >> are >> > still running, they recover fine. However, if I shutdown a lead broker >> > without a running producer, I can't seem to start the producer >> afterwards >> > without restarting the previous lead broker. Is this expected? >> > On Jul 9, 2013 10:28 AM, "Joel Koshy" wrote: >> > >> >> For 1 I forgot to add - there is an admin tool to reassign replicas >> but it >> >> would take longer than leader failover. >> >> >> >> Joel >> >> >> >> On Tuesday, July 9, 2013, Joel Koshy wrote: >> >> >> >> > 1 - no, unless broker4 is not the preferred leader. (The preferred >> >> > leader is the first broker in the assigned replica list). If a >> >> > non-preferred replica is the current leader you can run the >> >> > PreferredReplicaLeaderElection admin command to move the leader. >> >> > 2 - The actual leader movement (on leader failover) is fairly low - >> >> > probably of the order of tens of ms. However, clients (producers, >> >> > consumers) may take longer to detect that (it needs to get back an >> >> > error response, handle an exception, issue a metadata request, get >> the >> >> > response to find the new leader, and all that can add up but it >> should >> >> > not be terribly high - I'm guessing on the order of a few hundred ms >> >> > to a second or so). >> >> > 3 - That should work, although the admin command for adding more >> >> > partitions to a topic is currently being developed. >> >> > >> >> > >> >> > On Mon, Jul 8, 2013 at 11:02 PM, Calvin Lei >> wrote: >> >> > > Hi, >> >> > > I have two questions regarding the kafka broker setup. >> >> > > >> >> > > 1. Assuming i have a 4-broker and 2-zookeeper (running in quorum >> mode) >> >> > > setup, if topicA-partition0 has the leader set to broker4, can I >> change >> >> > the >> >> > > leader to other broker without killing the current leader? >> >> > > >> >> > > 2. What is the latency of switching to a different leader when the >> >> > current >> >> > > leader is down? Do we configure it using the consumer property - >> >> > > refresh.leader.backoff.ms >> >> > > >> >> > > 3. What is the best practice of dynamically adding a new node to a >> >> kafka >> >> > > cluster? Should i bring up the node, and then increase the >> replication >> >> > > factor for the existing topic(s)? >> >> > > >> >> > > >> >> > > thanks in advance, >> >> > > Cal >> >> > >> >> >> > >
Re: Combating network latency best practice
Thanks Jay. I thought of using the worldview architecture you suggested. But since our consumers are also globally deployed, which means any new messages arrive the worldview needs to be replicated back to the local DCs, making the topology a bit complicated. Would you please elaborate on the remote write? How do I achieve it? On Jul 10, 2013 1:08 AM, "Jay Kreps" wrote: > Ah, good question we really should add this to the documentation. > > We run a cluster per data center. All writes always go to the data-center > local cluster. Replication to aggregate clusters that provide the "world > wide" view is done with mirror maker. > > It is also fine to write to or read from a kafka cluster in a remote colo, > though obviously you have to think about the case where the cluster is not > accessible due to network access. > > Kafka is not designed to run a single cluster spread across geographically > disparate colos and you would see a few problems in that scenario. The > first is that, as you noted, the latency will be terrible as it will block > on the slowest response from all datacenters. This could be avoided if you > lowered the request.required.acks to 1, but that would impact durability > guarantees. The second problem is that Kafka will not remain available in > the presence of network partitions so if the inter-datacenter link failed > one datacenter would lose its cluster. Finally we have not done anything to > attempt to optimize partition placement by colo so you would not actually > have redundancy between colos because we would often place all replicas in > a single colo. > > -Jay > > > On Tue, Jul 9, 2013 at 9:34 PM, Calvin Lei wrote: > > > Folks, > >Our application has multiple producers globally (region1, region2, > > region3). If we group all the brokers together into one cluster, we > notice > > an obvious network latency if a broker replicates regionally with the > > request.required.acks = -1. > > > >Is there any best practice for combating the network latency in the > > deployment topology? Should we segregate the brokers regionally (one > kafka > > cluster per region) and set up MirrorMaker between the regions (region1 > > <--> region2, region2 <--> region3, region1 <--> region3), total of 6 > > mirror makes? > > > > > > Thanks. > > >
Combating network latency best practice
Folks, Our application has multiple producers globally (region1, region2, region3). If we group all the brokers together into one cluster, we notice an obvious network latency if a broker replicates regionally with the request.required.acks = -1. Is there any best practice for combating the network latency in the deployment topology? Should we segregate the brokers regionally (one kafka cluster per region) and set up MirrorMaker between the regions (region1 <--> region2, region2 <--> region3, region1 <--> region3), total of 6 mirror makes? Thanks.
Re: Questions regarding broker
I will try to reproduce it. it was sporadic. My set up was a topic with 1 partition and replication factor = 3. If i kill the console producer and then shut down the leader broker, a new leader is elected. If I again kill the new lead, I dont see the last broker be elected as a leader. Then i tried starting the console producer, i started seeing errors. On Tue, Jul 9, 2013 at 6:14 PM, Joel Koshy wrote: > Not really - if you shutdown a leader broker (and assuming your > replication factor is > 1) then the other assigned replica will be > elected as the new leader. The producer would then look up metadata, > find the new leader and send requests to it. What do you see in the > logs? > > Joel > > On Tue, Jul 9, 2013 at 1:44 PM, Calvin Lei wrote: > > Thanks you have me enough pointers to dig deeper. And I tested the fault > > tolerance by shutting down brokers randomly. > > > > What I noticed is if I shutdown brokers while my producer and consumer > are > > still running, they recover fine. However, if I shutdown a lead broker > > without a running producer, I can't seem to start the producer afterwards > > without restarting the previous lead broker. Is this expected? > > On Jul 9, 2013 10:28 AM, "Joel Koshy" wrote: > > > >> For 1 I forgot to add - there is an admin tool to reassign replicas but > it > >> would take longer than leader failover. > >> > >> Joel > >> > >> On Tuesday, July 9, 2013, Joel Koshy wrote: > >> > >> > 1 - no, unless broker4 is not the preferred leader. (The preferred > >> > leader is the first broker in the assigned replica list). If a > >> > non-preferred replica is the current leader you can run the > >> > PreferredReplicaLeaderElection admin command to move the leader. > >> > 2 - The actual leader movement (on leader failover) is fairly low - > >> > probably of the order of tens of ms. However, clients (producers, > >> > consumers) may take longer to detect that (it needs to get back an > >> > error response, handle an exception, issue a metadata request, get the > >> > response to find the new leader, and all that can add up but it should > >> > not be terribly high - I'm guessing on the order of a few hundred ms > >> > to a second or so). > >> > 3 - That should work, although the admin command for adding more > >> > partitions to a topic is currently being developed. > >> > > >> > > >> > On Mon, Jul 8, 2013 at 11:02 PM, Calvin Lei wrote: > >> > > Hi, > >> > > I have two questions regarding the kafka broker setup. > >> > > > >> > > 1. Assuming i have a 4-broker and 2-zookeeper (running in quorum > mode) > >> > > setup, if topicA-partition0 has the leader set to broker4, can I > change > >> > the > >> > > leader to other broker without killing the current leader? > >> > > > >> > > 2. What is the latency of switching to a different leader when the > >> > current > >> > > leader is down? Do we configure it using the consumer property - > >> > > refresh.leader.backoff.ms > >> > > > >> > > 3. What is the best practice of dynamically adding a new node to a > >> kafka > >> > > cluster? Should i bring up the node, and then increase the > replication > >> > > factor for the existing topic(s)? > >> > > > >> > > > >> > > thanks in advance, > >> > > Cal > >> > > >> >
Re: Questions regarding broker
Thanks you have me enough pointers to dig deeper. And I tested the fault tolerance by shutting down brokers randomly. What I noticed is if I shutdown brokers while my producer and consumer are still running, they recover fine. However, if I shutdown a lead broker without a running producer, I can't seem to start the producer afterwards without restarting the previous lead broker. Is this expected? On Jul 9, 2013 10:28 AM, "Joel Koshy" wrote: > For 1 I forgot to add - there is an admin tool to reassign replicas but it > would take longer than leader failover. > > Joel > > On Tuesday, July 9, 2013, Joel Koshy wrote: > > > 1 - no, unless broker4 is not the preferred leader. (The preferred > > leader is the first broker in the assigned replica list). If a > > non-preferred replica is the current leader you can run the > > PreferredReplicaLeaderElection admin command to move the leader. > > 2 - The actual leader movement (on leader failover) is fairly low - > > probably of the order of tens of ms. However, clients (producers, > > consumers) may take longer to detect that (it needs to get back an > > error response, handle an exception, issue a metadata request, get the > > response to find the new leader, and all that can add up but it should > > not be terribly high - I'm guessing on the order of a few hundred ms > > to a second or so). > > 3 - That should work, although the admin command for adding more > > partitions to a topic is currently being developed. > > > > > > On Mon, Jul 8, 2013 at 11:02 PM, Calvin Lei wrote: > > > Hi, > > > I have two questions regarding the kafka broker setup. > > > > > > 1. Assuming i have a 4-broker and 2-zookeeper (running in quorum mode) > > > setup, if topicA-partition0 has the leader set to broker4, can I change > > the > > > leader to other broker without killing the current leader? > > > > > > 2. What is the latency of switching to a different leader when the > > current > > > leader is down? Do we configure it using the consumer property - > > > refresh.leader.backoff.ms > > > > > > 3. What is the best practice of dynamically adding a new node to a > kafka > > > cluster? Should i bring up the node, and then increase the replication > > > factor for the existing topic(s)? > > > > > > > > > thanks in advance, > > > Cal > > >
Questions regarding broker
Hi, I have two questions regarding the kafka broker setup. 1. Assuming i have a 4-broker and 2-zookeeper (running in quorum mode) setup, if topicA-partition0 has the leader set to broker4, can I change the leader to other broker without killing the current leader? 2. What is the latency of switching to a different leader when the current leader is down? Do we configure it using the consumer property - refresh.leader.backoff.ms 3. What is the best practice of dynamically adding a new node to a kafka cluster? Should i bring up the node, and then increase the replication factor for the existing topic(s)? thanks in advance, Cal
Re: Changing the number of partitions after a topic is created
Thanks Phillip. I used the kafka-topic.sh to create the topic to have 1 partition. Would the changing the server properties still work in this case? Also this setting sounds like a global overwrite, correct? regards, Cal On Jul 4, 2013, at 11:49 AM, Philip O'Toole wrote: > If you can pause your Producers, simply change the partition count to > 10 in the Kafka server properties file, and restart it. If you use the > high-level consumer, it will automatically start draining all > partitions that exist for a given topic. > > And our Kafka producers always write to partition -1 for a given > topic, meaning they never care about the partition count, and need no > changes in circumstances like this. > > Philip > > On Thu, Jul 4, 2013 at 11:11 AM, Calvin Lei wrote: >> Hi I have a few topics created with 1 partition. After running the cluster >> for a few days, I want to increase the partition to 10 to improve the >> consumer throughput. I learnt that it is not supported in 0.8. >> >> What is the recommendation of improving consumer throughput after a topic >> is created and the data volume increased? >> >> Regards, >> Cal
Handling error
Hi, is there any documents for exceptions thrown by Kafka? I am trying to capture errors coming from Kafka. For example, when a producer fails to connects to zookeeper because it is down. Regards, Cal
Changing the number of partitions after a topic is created
Hi I have a few topics created with 1 partition. After running the cluster for a few days, I want to increase the partition to 10 to improve the consumer throughput. I learnt that it is not supported in 0.8. What is the recommendation of improving consumer throughput after a topic is created and the data volume increased? Regards, Cal