much reduced io utilization after upgrade to 0.8.0 -> 0.8.1.1

2014-07-22 Thread Jason Rosenberg
I recently upgraded some of our kafka clusters to use 0.8.1.1 (from 0.8.0).
 It's all looking good so far.  One thing I notice though (seems like a
good thing) is that the iostat utilization has gone way down after the
upgrade.

I'm not sure if I know exactly what could could be responsible for this, is
this an expected result.

Is it possibly related to:  https://issues.apache.org/jira/browse/KAFKA-615

Thanks,

Jason


Re: how to ensure strong consistency with reasonable availability

2014-07-22 Thread scott@heroku
Thanks so much for the detailed explanation Jun, it pretty much lines up with 
my understanding.

In the case below, if we didn't particularly care about ordering and 
re-produced m2, it would then become m5, and in many use cases this would be ok?

Perhaps a more direct question would be, once 0.8.2 is out and I have a topic 
with unclean leader election disabled, and produce with acks = -1,  are there 
any known series of events (other than disk failures on all brokers) that would 
cause the loss of messages that a producer has received an ack for?





Sent from my iPhone

> On Jul 22, 2014, at 8:17 PM, Jun Rao  wrote:
> 
> They key point is that we have to keep all replicas consistent with each
> other such that no matter which replica a consumer reads from, it always
> reads the same data on a given offset.  The following is an example.
> 
> Suppose we have 3 brokers A, B and C. Let's assume A is the leader and at
> some point, we have the following offsets and messages in each replica.
> 
> offset   A   B   C
> 1m1  m1  m1
> 2m2
> 
> Let's assume that message m1 is committed and message m2 is not. At exactly
> this moment, replica A dies. After a new leader is elected, say B,  new
> messages can be committed with just replica B and C. Some point later if we
> commit two more messages m3 and m4, we will have the following.
> 
> offset   A   B   C
> 1m1  m1  m1
> 2m2  m3  m3
> 3m4  m4
> 
> Now A comes back. For consistency, it's important for A's log to be
> identical to B and C. So, we have to remove m2 from A's log and add m3 and
> m4. As you can see, whether you want to republish m2 or not, m2 cannot stay
> in its current offset, since in other replicas, that offset is already
> taken by other messages. Therefore, a truncation of replica A's log is
> needed to keep the replicas consistent. Currently, we don republish
> messages like m2 since (1) it's not necessary since it's never considered
> committed; (2) it will make our protocol more complicated.
> 
> Thanks,
> 
> Jun
> 
> 
> 
> 
>> On Tue, Jul 22, 2014 at 3:40 PM, scott@heroku  wrote:
>> 
>> Thanks Jun
>> 
>> Can you explain a little more about what an uncommitted message means?
>> The messages are in the log so presumably? they have been acked at least
>> by the the local broker.
>> 
>> I guess I am hoping for some intuition around why 'replaying' the messages
>> in question would cause bad things.
>> 
>> Thanks!
>> 
>> 
>> Sent from my iPhone
>>> On Jul 22, 2014, at 3:06 PM, Jun Rao  wrote:
>>> 
>>> Scott,
>>> 
>>> The reason for truncation is that the broker that comes back may have
>> some
>>> un-committed messages. Those messages shouldn't be exposed to the
>> consumer
>>> and therefore need to be removed from the log. So, on broker startup, we
>>> first truncate the log to a safe point before which we know all messages
>>> are committed. This broker will then sync up with the current leader to
>> get
>>> the remaining messages.
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> 
 On Tue, Jul 22, 2014 at 9:42 AM, Scott Clasen  wrote:
 
 Ahh, yes that message loss case. I've wondered about that myself.
 
 I guess I dont really understand why truncating messages is ever the
>> right
 thing to do.  As kafka is an 'at least once' system. (send a message,
>> get
 no ack, it still might be on the topic) consumers that care will have to
 de-dupe anyhow.
 
 To the kafka designers:  is there anything preventing implementation of
 alternatives to truncation? when a broker comes back online and needs to
 truncate, cant it fire up a producer and take the extra messages and
>> send
 them back to the original topic or alternatively an error topic?
 
 Would love to understand the rationale for the current design, as my
 perspective is doubtfully as clear as the designers'
 
 
 
 
 On Tue, Jul 22, 2014 at 6:21 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
 LEX -)  wrote:
 
> kafka-1028 addressed another unclean leader election problem. It
>> prevents
> a broker not in ISR from becoming a leader. The problem we are facing
>> is
> that a broker in ISR but without complete messages may become a leader.
> It's also a kind of unclean leader election, but not the one that
> kafka-1028 addressed.
> 
> Here I'm trying to give a proof that current kafka doesn't achieve the
> requirement (no message loss, no blocking when 1 broker down) due to
>> its
> two behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with
>> less
> messages may be chosen as the leader
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR
>> when
> it has less messages than the leader.
> 
> We consider a cluster with 3 brokers and a topic with 3 replicas. We
> analyze different cases according to the value of request.required.acks
> (acks for short). For each case a

Re: how to ensure strong consistency with reasonable availability

2014-07-22 Thread Jun Rao
They key point is that we have to keep all replicas consistent with each
other such that no matter which replica a consumer reads from, it always
reads the same data on a given offset.  The following is an example.

Suppose we have 3 brokers A, B and C. Let's assume A is the leader and at
some point, we have the following offsets and messages in each replica.

offset   A   B   C
1m1  m1  m1
2m2

Let's assume that message m1 is committed and message m2 is not. At exactly
this moment, replica A dies. After a new leader is elected, say B,  new
messages can be committed with just replica B and C. Some point later if we
commit two more messages m3 and m4, we will have the following.

offset   A   B   C
1m1  m1  m1
2m2  m3  m3
3m4  m4

Now A comes back. For consistency, it's important for A's log to be
identical to B and C. So, we have to remove m2 from A's log and add m3 and
m4. As you can see, whether you want to republish m2 or not, m2 cannot stay
in its current offset, since in other replicas, that offset is already
taken by other messages. Therefore, a truncation of replica A's log is
needed to keep the replicas consistent. Currently, we don republish
messages like m2 since (1) it's not necessary since it's never considered
committed; (2) it will make our protocol more complicated.

Thanks,

Jun




On Tue, Jul 22, 2014 at 3:40 PM, scott@heroku  wrote:

> Thanks Jun
>
> Can you explain a little more about what an uncommitted message means?
>  The messages are in the log so presumably? they have been acked at least
> by the the local broker.
>
> I guess I am hoping for some intuition around why 'replaying' the messages
> in question would cause bad things.
>
> Thanks!
>
>
> Sent from my iPhone
> > On Jul 22, 2014, at 3:06 PM, Jun Rao  wrote:
> >
> > Scott,
> >
> > The reason for truncation is that the broker that comes back may have
> some
> > un-committed messages. Those messages shouldn't be exposed to the
> consumer
> > and therefore need to be removed from the log. So, on broker startup, we
> > first truncate the log to a safe point before which we know all messages
> > are committed. This broker will then sync up with the current leader to
> get
> > the remaining messages.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >> On Tue, Jul 22, 2014 at 9:42 AM, Scott Clasen  wrote:
> >>
> >> Ahh, yes that message loss case. I've wondered about that myself.
> >>
> >> I guess I dont really understand why truncating messages is ever the
> right
> >> thing to do.  As kafka is an 'at least once' system. (send a message,
> get
> >> no ack, it still might be on the topic) consumers that care will have to
> >> de-dupe anyhow.
> >>
> >> To the kafka designers:  is there anything preventing implementation of
> >> alternatives to truncation? when a broker comes back online and needs to
> >> truncate, cant it fire up a producer and take the extra messages and
> send
> >> them back to the original topic or alternatively an error topic?
> >>
> >> Would love to understand the rationale for the current design, as my
> >> perspective is doubtfully as clear as the designers'
> >>
> >>
> >>
> >>
> >> On Tue, Jul 22, 2014 at 6:21 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
> >> LEX -)  wrote:
> >>
> >>> kafka-1028 addressed another unclean leader election problem. It
> prevents
> >>> a broker not in ISR from becoming a leader. The problem we are facing
> is
> >>> that a broker in ISR but without complete messages may become a leader.
> >>> It's also a kind of unclean leader election, but not the one that
> >>> kafka-1028 addressed.
> >>>
> >>> Here I'm trying to give a proof that current kafka doesn't achieve the
> >>> requirement (no message loss, no blocking when 1 broker down) due to
> its
> >>> two behaviors:
> >>> 1. when choosing a new leader from 2 followers in ISR, the one with
> less
> >>> messages may be chosen as the leader
> >>> 2. even when replica.lag.max.messages=0, a follower can stay in ISR
> when
> >>> it has less messages than the leader.
> >>>
> >>> We consider a cluster with 3 brokers and a topic with 3 replicas. We
> >>> analyze different cases according to the value of request.required.acks
> >>> (acks for short). For each case and it subcases, we find situations
> that
> >>> either message loss or service blocking happens. We assume that at the
> >>> beginning, all 3 replicas, leader A, followers B and C, are in sync,
> >> i.e.,
> >>> they have the same messages and are all in ISR.
> >>>
> >>> 1. acks=0, 1, 3. Obviously these settings do not satisfy the
> requirement.
> >>> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At
> >>> this time, although C hasn't received m, it's still in ISR. If A is
> >> killed,
> >>> C can be elected as the new leader, and consumers will miss m.
> >>> 3. acks=-1. Suppose replica.lag.max.messages=M. There are two
> sub-cases:
> >>> 3.1 M>0. Suppose C be killed. C will be out of ISR after
> >>> replica.lag.time.max.ms. Th

Re: how to ensure strong consistency with reasonable availability

2014-07-22 Thread scott@heroku
Thanks Jun

Can you explain a little more about what an uncommitted message means?  The 
messages are in the log so presumably? they have been acked at least by the the 
local broker.

I guess I am hoping for some intuition around why 'replaying' the messages in 
question would cause bad things.

Thanks!


Sent from my iPhone
> On Jul 22, 2014, at 3:06 PM, Jun Rao  wrote:
> 
> Scott,
> 
> The reason for truncation is that the broker that comes back may have some
> un-committed messages. Those messages shouldn't be exposed to the consumer
> and therefore need to be removed from the log. So, on broker startup, we
> first truncate the log to a safe point before which we know all messages
> are committed. This broker will then sync up with the current leader to get
> the remaining messages.
> 
> Thanks,
> 
> Jun
> 
> 
>> On Tue, Jul 22, 2014 at 9:42 AM, Scott Clasen  wrote:
>> 
>> Ahh, yes that message loss case. I've wondered about that myself.
>> 
>> I guess I dont really understand why truncating messages is ever the right
>> thing to do.  As kafka is an 'at least once' system. (send a message, get
>> no ack, it still might be on the topic) consumers that care will have to
>> de-dupe anyhow.
>> 
>> To the kafka designers:  is there anything preventing implementation of
>> alternatives to truncation? when a broker comes back online and needs to
>> truncate, cant it fire up a producer and take the extra messages and send
>> them back to the original topic or alternatively an error topic?
>> 
>> Would love to understand the rationale for the current design, as my
>> perspective is doubtfully as clear as the designers'
>> 
>> 
>> 
>> 
>> On Tue, Jul 22, 2014 at 6:21 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
>> LEX -)  wrote:
>> 
>>> kafka-1028 addressed another unclean leader election problem. It prevents
>>> a broker not in ISR from becoming a leader. The problem we are facing is
>>> that a broker in ISR but without complete messages may become a leader.
>>> It's also a kind of unclean leader election, but not the one that
>>> kafka-1028 addressed.
>>> 
>>> Here I'm trying to give a proof that current kafka doesn't achieve the
>>> requirement (no message loss, no blocking when 1 broker down) due to its
>>> two behaviors:
>>> 1. when choosing a new leader from 2 followers in ISR, the one with less
>>> messages may be chosen as the leader
>>> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when
>>> it has less messages than the leader.
>>> 
>>> We consider a cluster with 3 brokers and a topic with 3 replicas. We
>>> analyze different cases according to the value of request.required.acks
>>> (acks for short). For each case and it subcases, we find situations that
>>> either message loss or service blocking happens. We assume that at the
>>> beginning, all 3 replicas, leader A, followers B and C, are in sync,
>> i.e.,
>>> they have the same messages and are all in ISR.
>>> 
>>> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
>>> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At
>>> this time, although C hasn't received m, it's still in ISR. If A is
>> killed,
>>> C can be elected as the new leader, and consumers will miss m.
>>> 3. acks=-1. Suppose replica.lag.max.messages=M. There are two sub-cases:
>>> 3.1 M>0. Suppose C be killed. C will be out of ISR after
>>> replica.lag.time.max.ms. Then the producer publishes M messages to A and
>>> B. C restarts. C will join in ISR since it is M messages behind A and B.
>>> Before C replicates all messages, A is killed, and C becomes leader, then
>>> message loss happens.
>>> 3.2 M=0. In this case, when the producer publishes at a high speed, B and
>>> C will fail out of ISR, only A keeps receiving messages. Then A is
>> killed.
>>> Either message loss or service blocking will happen, depending on whether
>>> unclean leader election is enabled.
>>> 
>>> 
>>> From: users@kafka.apache.org At: Jul 21 2014 22:28:18
>>> To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -),
>> users@kafka.apache.org
>>> Subject: Re: how to ensure strong consistency with reasonable
>> availability
>>> 
>>> You will probably need 0.8.2  which gives
>>> https://issues.apache.org/jira/browse/KAFKA-1028
>>> 
>>> 
>>> On Mon, Jul 21, 2014 at 6:37 PM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
>>> LEX -)  wrote:
>>> 
 Hi everyone,
 
 With a cluster of 3 brokers and a topic of 3 replicas, we want to
>> achieve
 the following two properties:
 1. when only one broker is down, there's no message loss, and
 procuders/consumers are not blocked.
 2. in other more serious problems, for example, one broker is restarted
 twice in a short period or two brokers are down at the same time,
 producers/consumers can be blocked, but no message loss is allowed.
 
 We haven't found any producer/broker paramter combinations that achieve
 this. If you know or think some configurations will work, please post

Re: how to ensure strong consistency with reasonable availability

2014-07-22 Thread Jun Rao
Scott,

The reason for truncation is that the broker that comes back may have some
un-committed messages. Those messages shouldn't be exposed to the consumer
and therefore need to be removed from the log. So, on broker startup, we
first truncate the log to a safe point before which we know all messages
are committed. This broker will then sync up with the current leader to get
the remaining messages.

Thanks,

Jun


On Tue, Jul 22, 2014 at 9:42 AM, Scott Clasen  wrote:

> Ahh, yes that message loss case. I've wondered about that myself.
>
> I guess I dont really understand why truncating messages is ever the right
> thing to do.  As kafka is an 'at least once' system. (send a message, get
> no ack, it still might be on the topic) consumers that care will have to
> de-dupe anyhow.
>
> To the kafka designers:  is there anything preventing implementation of
> alternatives to truncation? when a broker comes back online and needs to
> truncate, cant it fire up a producer and take the extra messages and send
> them back to the original topic or alternatively an error topic?
>
> Would love to understand the rationale for the current design, as my
> perspective is doubtfully as clear as the designers'
>
>
>
>
> On Tue, Jul 22, 2014 at 6:21 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
> LEX -)  wrote:
>
> > kafka-1028 addressed another unclean leader election problem. It prevents
> > a broker not in ISR from becoming a leader. The problem we are facing is
> > that a broker in ISR but without complete messages may become a leader.
> > It's also a kind of unclean leader election, but not the one that
> > kafka-1028 addressed.
> >
> > Here I'm trying to give a proof that current kafka doesn't achieve the
> > requirement (no message loss, no blocking when 1 broker down) due to its
> > two behaviors:
> > 1. when choosing a new leader from 2 followers in ISR, the one with less
> > messages may be chosen as the leader
> > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when
> > it has less messages than the leader.
> >
> > We consider a cluster with 3 brokers and a topic with 3 replicas. We
> > analyze different cases according to the value of request.required.acks
> > (acks for short). For each case and it subcases, we find situations that
> > either message loss or service blocking happens. We assume that at the
> > beginning, all 3 replicas, leader A, followers B and C, are in sync,
> i.e.,
> > they have the same messages and are all in ISR.
> >
> > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At
> > this time, although C hasn't received m, it's still in ISR. If A is
> killed,
> > C can be elected as the new leader, and consumers will miss m.
> > 3. acks=-1. Suppose replica.lag.max.messages=M. There are two sub-cases:
> > 3.1 M>0. Suppose C be killed. C will be out of ISR after
> > replica.lag.time.max.ms. Then the producer publishes M messages to A and
> > B. C restarts. C will join in ISR since it is M messages behind A and B.
> > Before C replicates all messages, A is killed, and C becomes leader, then
> > message loss happens.
> > 3.2 M=0. In this case, when the producer publishes at a high speed, B and
> > C will fail out of ISR, only A keeps receiving messages. Then A is
> killed.
> > Either message loss or service blocking will happen, depending on whether
> > unclean leader election is enabled.
> >
> >
> > From: users@kafka.apache.org At: Jul 21 2014 22:28:18
> > To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -),
> users@kafka.apache.org
> > Subject: Re: how to ensure strong consistency with reasonable
> availability
> >
> > You will probably need 0.8.2  which gives
> > https://issues.apache.org/jira/browse/KAFKA-1028
> >
> >
> > On Mon, Jul 21, 2014 at 6:37 PM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
> > LEX -)  wrote:
> >
> > > Hi everyone,
> > >
> > > With a cluster of 3 brokers and a topic of 3 replicas, we want to
> achieve
> > > the following two properties:
> > > 1. when only one broker is down, there's no message loss, and
> > > procuders/consumers are not blocked.
> > > 2. in other more serious problems, for example, one broker is restarted
> > > twice in a short period or two brokers are down at the same time,
> > > producers/consumers can be blocked, but no message loss is allowed.
> > >
> > > We haven't found any producer/broker paramter combinations that achieve
> > > this. If you know or think some configurations will work, please post
> > > details. We have a test bed to verify any given configurations.
> > >
> > > In addition, I'm wondering if it's necessary to open a jira to require
> > the
> > > above feature?
> > >
> > > Thanks,
> > > Jiang
> >
> >
> >
>


Re: how to ensure strong consistency with reasonable availability

2014-07-22 Thread Jun Rao
Jiang,

In case 3.1, when C restarts, the protocol is that C can only join ISR if
it has received all messages up to the current high watermark.

For example, let's assume that M is 10. Let's say A, B, C all have messages
at offset 100 and all those messages are committed (therefore high
watermark is at 100). Then C dies. After that, we commit 5 more messages
with both A and B (high watermark is at 105). Now, C is restarted. C is
actually not allowed to rejoin ISR until its log end offset has passed 105.
This means that C must first fetch the 5 newly committed messages before
being added to ISR.

Are you observing data loss in this case?

Thanks,

Jun


On Tue, Jul 22, 2014 at 6:21 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
LEX -)  wrote:

> kafka-1028 addressed another unclean leader election problem. It prevents
> a broker not in ISR from becoming a leader. The problem we are facing is
> that a broker in ISR but without complete messages may become a leader.
> It's also a kind of unclean leader election, but not the one that
> kafka-1028 addressed.
>
> Here I'm trying to give a proof that current kafka doesn't achieve the
> requirement (no message loss, no blocking when 1 broker down) due to its
> two behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less
> messages may be chosen as the leader
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when
> it has less messages than the leader.
>
> We consider a cluster with 3 brokers and a topic with 3 replicas. We
> analyze different cases according to the value of request.required.acks
> (acks for short). For each case and it subcases, we find situations that
> either message loss or service blocking happens. We assume that at the
> beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e.,
> they have the same messages and are all in ISR.
>
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At
> this time, although C hasn't received m, it's still in ISR. If A is killed,
> C can be elected as the new leader, and consumers will miss m.
> 3. acks=-1. Suppose replica.lag.max.messages=M. There are two sub-cases:
> 3.1 M>0. Suppose C be killed. C will be out of ISR after
> replica.lag.time.max.ms. Then the producer publishes M messages to A and
> B. C restarts. C will join in ISR since it is M messages behind A and B.
> Before C replicates all messages, A is killed, and C becomes leader, then
> message loss happens.
> 3.2 M=0. In this case, when the producer publishes at a high speed, B and
> C will fail out of ISR, only A keeps receiving messages. Then A is killed.
> Either message loss or service blocking will happen, depending on whether
> unclean leader election is enabled.
>
>
> From: users@kafka.apache.org At: Jul 21 2014 22:28:18
> To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org
> Subject: Re: how to ensure strong consistency with reasonable availability
>
> You will probably need 0.8.2  which gives
> https://issues.apache.org/jira/browse/KAFKA-1028
>
>
> On Mon, Jul 21, 2014 at 6:37 PM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
> LEX -)  wrote:
>
> > Hi everyone,
> >
> > With a cluster of 3 brokers and a topic of 3 replicas, we want to achieve
> > the following two properties:
> > 1. when only one broker is down, there's no message loss, and
> > procuders/consumers are not blocked.
> > 2. in other more serious problems, for example, one broker is restarted
> > twice in a short period or two brokers are down at the same time,
> > producers/consumers can be blocked, but no message loss is allowed.
> >
> > We haven't found any producer/broker paramter combinations that achieve
> > this. If you know or think some configurations will work, please post
> > details. We have a test bed to verify any given configurations.
> >
> > In addition, I'm wondering if it's necessary to open a jira to require
> the
> > above feature?
> >
> > Thanks,
> > Jiang
>
>
>


Re: [DISCUSS] Kafka Security Specific Features

2014-07-22 Thread Pramod Deshmukh
Anyone getting this issue. Is it something related to environment or it is
the code. Producer works fine when run with secure=false (no security) mode.


pdeshmukh$ bin/kafka-console-producer.sh --broker-list localhost:9092:true
--topic secureTopic

[2014-07-18 13:12:29,817] WARN Property topic is not valid
(kafka.utils.VerifiableProperties)

Hare Krishna

[2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation id
0 for topics [Set(secureTopic)] from broker
[id:0,host:localhost,port:9092,secure:true] failed
(kafka.client.ClientUtils$)

java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.

at kafka.utils.Utils$.read(Utils.scala:381)

at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)

at kafka.network.Receive$class.readCompletely(Transmission.scala:56)

at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)

at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)

at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)

at kafka.producer.SyncProducer.send(SyncProducer.scala:117)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)

at kafka.utils.Utils$.swallow(Utils.scala:172)

at kafka.utils.Logging$class.swallowError(Logging.scala:106)

at kafka.utils.Utils$.swallowError(Utils.scala:45)

at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)

at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)

at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)

at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)

at scala.collection.immutable.Stream.foreach(Stream.scala:526)

at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)


On Fri, Jul 18, 2014 at 1:20 PM, Pramod Deshmukh  wrote:

> Thanks Joe, I don't see any Out of memory error. Now I get exception when
> Producer fetches metadata for a topic
>
> Here is how I created the topic and run producer
>
> pdeshmukh$ bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic secureTopic
> Created topic "secureTopic".
>
> pdeshmukh$ bin/kafka-topics.sh --list --zookeeper localhost:2181
>
> secure.test
>
> secureTopic
>
> >> Run producer, tried both localhost:9092:true and localhost:9092
>
> pdeshmukh$ bin/kafka-console-producer.sh --broker-list localhost:9092:true
> --topic secureTopic
>
> [2014-07-18 13:12:29,817] WARN Property topic is not valid
> (kafka.utils.VerifiableProperties)
>
> Hare Krishna
>
> [2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(secureTopic)] from broker
> [id:0,host:localhost,port:9092,secure:true] failed
> (kafka.client.ClientUtils$)
>
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
>
> at kafka.utils.Utils$.read(Utils.scala:381)
>
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
>
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>
> at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
>
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
>
> at kafka.utils.Utils$.swallow(Utils.scala:172)
>
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
>
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
>
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>
> at scala.collection.immutable.Stream.foreach(Stream.scala:526)
>
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>
> at kafka.producer.async.ProducerSendThr

Re: how to ensure strong consistency with reasonable availability

2014-07-22 Thread Guozhang Wang
Hello Jiang,

If some of your brokers have frequent long GC caused soft failures you can
take a look at the operational wiki page to see if that helps tuning GC
settings.

https://cwiki.apache.org/confluence/display/KAFKA/Operations

Guozhang


On Tue, Jul 22, 2014 at 9:42 AM, Scott Clasen  wrote:

> Ahh, yes that message loss case. I've wondered about that myself.
>
> I guess I dont really understand why truncating messages is ever the right
> thing to do.  As kafka is an 'at least once' system. (send a message, get
> no ack, it still might be on the topic) consumers that care will have to
> de-dupe anyhow.
>
> To the kafka designers:  is there anything preventing implementation of
> alternatives to truncation? when a broker comes back online and needs to
> truncate, cant it fire up a producer and take the extra messages and send
> them back to the original topic or alternatively an error topic?
>
> Would love to understand the rationale for the current design, as my
> perspective is doubtfully as clear as the designers'
>
>
>
>
> On Tue, Jul 22, 2014 at 6:21 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
> LEX -)  wrote:
>
> > kafka-1028 addressed another unclean leader election problem. It prevents
> > a broker not in ISR from becoming a leader. The problem we are facing is
> > that a broker in ISR but without complete messages may become a leader.
> > It's also a kind of unclean leader election, but not the one that
> > kafka-1028 addressed.
> >
> > Here I'm trying to give a proof that current kafka doesn't achieve the
> > requirement (no message loss, no blocking when 1 broker down) due to its
> > two behaviors:
> > 1. when choosing a new leader from 2 followers in ISR, the one with less
> > messages may be chosen as the leader
> > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when
> > it has less messages than the leader.
> >
> > We consider a cluster with 3 brokers and a topic with 3 replicas. We
> > analyze different cases according to the value of request.required.acks
> > (acks for short). For each case and it subcases, we find situations that
> > either message loss or service blocking happens. We assume that at the
> > beginning, all 3 replicas, leader A, followers B and C, are in sync,
> i.e.,
> > they have the same messages and are all in ISR.
> >
> > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At
> > this time, although C hasn't received m, it's still in ISR. If A is
> killed,
> > C can be elected as the new leader, and consumers will miss m.
> > 3. acks=-1. Suppose replica.lag.max.messages=M. There are two sub-cases:
> > 3.1 M>0. Suppose C be killed. C will be out of ISR after
> > replica.lag.time.max.ms. Then the producer publishes M messages to A and
> > B. C restarts. C will join in ISR since it is M messages behind A and B.
> > Before C replicates all messages, A is killed, and C becomes leader, then
> > message loss happens.
> > 3.2 M=0. In this case, when the producer publishes at a high speed, B and
> > C will fail out of ISR, only A keeps receiving messages. Then A is
> killed.
> > Either message loss or service blocking will happen, depending on whether
> > unclean leader election is enabled.
> >
> >
> > From: users@kafka.apache.org At: Jul 21 2014 22:28:18
> > To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -),
> users@kafka.apache.org
> > Subject: Re: how to ensure strong consistency with reasonable
> availability
> >
> > You will probably need 0.8.2  which gives
> > https://issues.apache.org/jira/browse/KAFKA-1028
> >
> >
> > On Mon, Jul 21, 2014 at 6:37 PM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
> > LEX -)  wrote:
> >
> > > Hi everyone,
> > >
> > > With a cluster of 3 brokers and a topic of 3 replicas, we want to
> achieve
> > > the following two properties:
> > > 1. when only one broker is down, there's no message loss, and
> > > procuders/consumers are not blocked.
> > > 2. in other more serious problems, for example, one broker is restarted
> > > twice in a short period or two brokers are down at the same time,
> > > producers/consumers can be blocked, but no message loss is allowed.
> > >
> > > We haven't found any producer/broker paramter combinations that achieve
> > > this. If you know or think some configurations will work, please post
> > > details. We have a test bed to verify any given configurations.
> > >
> > > In addition, I'm wondering if it's necessary to open a jira to require
> > the
> > > above feature?
> > >
> > > Thanks,
> > > Jiang
> >
> >
> >
>



-- 
-- Guozhang


Re: New Consumer Design

2014-07-22 Thread Robert Withers
Hi Jay,

For sure and a capability protocol adds even more.  It's down to cost/benefit, 
a nice place to be.

- Rob

> On Jul 22, 2014, at 10:46 AM, Jay Kreps  wrote:
> 
> Hey Robert,
> 
> I think the issue is that you lose much of the simplification if you
> still need to heart beat and you need a new commit notification
> message. I think that would make this proposal more complex than the
> current one.
> 
> -Jay
> 
> On Mon, Jul 21, 2014 at 6:44 PM, Robert Withers
>  wrote:
>> Thanks, Jay, for the good summary.  Regarding point 2, I would think the 
>> heartbeat would still be desired, to give control over liveness detection 
>> parameters and to directly inform clients when gaining or losing a partition 
>> (especially when gaining a partition).  There would be no barrier and the 
>> rebalancer would be an offline scheduler, issuing SwitchPartition commands.  
>> The evaluation of a SwitchPartition command would await the consumer losing 
>> a partition to commit offset and any local commit work needed before 
>> confirming completion to the co-ordinator, which would then inform the new 
>> consumer and ISR brokers about the partition gain.  Broker security would be 
>> the master record of love assignments.
>> 
>> Thanks,
>> Rob
>> 
>>> On Jul 21, 2014, at 6:10 PM, Jay Kreps  wrote:
>>> 
>>> This thread is a bit long, but let me see if I can restate it
>>> correctly (not sure I fully follow).
>>> 
>>> There are two suggestions:
>>> 1. Allow partial rebalances that move just some partitions. I.e. if a
>>> consumer fails and has only one partition only one other consumer
>>> should be effected (the one who picks up the new partition). If there
>>> are many partitions to be reassigned there will obviously be a
>>> tradeoff between impacting all consumers and balancing load evenly.
>>> I.e. if you moved all load to one other consumer that would cause
>>> little rebalancing interruption but poor load balancing.
>>> 2. Have the co-ordinator communicate the assignments to the brokers
>>> rather than to the client directly. This could potentially simplify
>>> the consumer. Perhaps it would be possible to have the leader track
>>> liveness using the fetch requests rather than needing an artificial
>>> heartbeat.
>>> 
>>> These are interesting ideas.
>>> 
>>> -Jay
>>> 
>>> 
>>> 
 On Mon, Jul 21, 2014 at 4:46 PM, Guozhang Wang  wrote:
 Hello Rob,
 
 If I get your idea right, the idea is that if the rebalance only changes
 the ownership of a few consumers in the group, the coordinator can just
 sync with them and do not interrupt with other consumers.
 
 I think this approach may work. However it will likely complicates the
 logic of coordinator after we sketch out all the details since the
 rebalance results is basically depend on two variables: 1) partitions for
 the subscribed topics, 2) consumers inside the group, hence following this
 approach by the time the coordinator decides to trigger a rebalance, it
 must correctly keep track of which variable changes triggers the current
 rebalance process; on the other hand, the normal rebalance process even
 with a global barrier should usually be very fast, with a few hundreds of
 millis. So I am not sure if this is a worthy optimization that we would
 want for now. What do you think?
 
 Guozhang
 
 
 On Sat, Jul 19, 2014 at 12:33 PM, Robert Withers 
  wrote:
 
> Lock is a bad way to say it; a barrier is better.  I don't think what I am
> saying is even a barrier, since the rebalance would just need to recompute
> a rebalance schedule and submit it.  The only processing delay is to allow
> a soft remove to let the client cleanup, before you turn on the new guy, 
> so
> it lags a bit.  Do you think this could this work?
> 
> Thanks,
> Rob
> 
>>> On Jul 18, 2014, at 7:22 PM, Robert Withers 
>> wrote:
>> 
>> Hi Guozhang,
>> 
>> Thank you for considering my suggestions.  The security layer sounds
> like the right facet to design for these sorts of capabilities.  Have you
> considered a chained ocap security model for the broker using hash tokens?
> This would provide for per-partition read/write capabilities with QoS
> context including leases, revocation, debug level and monitoring.  
> Overkill
> disappears as no domain specific info needs to be stored at the brokers,
> like consumer/partition assignments.  The read ocap for consumer 7/topic
> bingo/partition 131 could be revoked at the brokers for a partition and
> subsequent reads would fail the fetch for requests with that ocap token.
> You could also dynamically change the log level for a specific
> consumer/partition.
>> 
>> There are advantages we could discuss to having finer grained control.
> Consider that scheduled partition rebalancing could be implemented with no
> pauses from the per

Re: New Consumer Design

2014-07-22 Thread Jay Kreps
Hey Robert,

I think the issue is that you lose much of the simplification if you
still need to heart beat and you need a new commit notification
message. I think that would make this proposal more complex than the
current one.

-Jay

On Mon, Jul 21, 2014 at 6:44 PM, Robert Withers
 wrote:
> Thanks, Jay, for the good summary.  Regarding point 2, I would think the 
> heartbeat would still be desired, to give control over liveness detection 
> parameters and to directly inform clients when gaining or losing a partition 
> (especially when gaining a partition).  There would be no barrier and the 
> rebalancer would be an offline scheduler, issuing SwitchPartition commands.  
> The evaluation of a SwitchPartition command would await the consumer losing a 
> partition to commit offset and any local commit work needed before confirming 
> completion to the co-ordinator, which would then inform the new consumer and 
> ISR brokers about the partition gain.  Broker security would be the master 
> record of love assignments.
>
> Thanks,
> Rob
>
>> On Jul 21, 2014, at 6:10 PM, Jay Kreps  wrote:
>>
>> This thread is a bit long, but let me see if I can restate it
>> correctly (not sure I fully follow).
>>
>> There are two suggestions:
>> 1. Allow partial rebalances that move just some partitions. I.e. if a
>> consumer fails and has only one partition only one other consumer
>> should be effected (the one who picks up the new partition). If there
>> are many partitions to be reassigned there will obviously be a
>> tradeoff between impacting all consumers and balancing load evenly.
>> I.e. if you moved all load to one other consumer that would cause
>> little rebalancing interruption but poor load balancing.
>> 2. Have the co-ordinator communicate the assignments to the brokers
>> rather than to the client directly. This could potentially simplify
>> the consumer. Perhaps it would be possible to have the leader track
>> liveness using the fetch requests rather than needing an artificial
>> heartbeat.
>>
>> These are interesting ideas.
>>
>> -Jay
>>
>>
>>
>>> On Mon, Jul 21, 2014 at 4:46 PM, Guozhang Wang  wrote:
>>> Hello Rob,
>>>
>>> If I get your idea right, the idea is that if the rebalance only changes
>>> the ownership of a few consumers in the group, the coordinator can just
>>> sync with them and do not interrupt with other consumers.
>>>
>>> I think this approach may work. However it will likely complicates the
>>> logic of coordinator after we sketch out all the details since the
>>> rebalance results is basically depend on two variables: 1) partitions for
>>> the subscribed topics, 2) consumers inside the group, hence following this
>>> approach by the time the coordinator decides to trigger a rebalance, it
>>> must correctly keep track of which variable changes triggers the current
>>> rebalance process; on the other hand, the normal rebalance process even
>>> with a global barrier should usually be very fast, with a few hundreds of
>>> millis. So I am not sure if this is a worthy optimization that we would
>>> want for now. What do you think?
>>>
>>> Guozhang
>>>
>>>
>>> On Sat, Jul 19, 2014 at 12:33 PM, Robert Withers >>> wrote:
>>>
 Lock is a bad way to say it; a barrier is better.  I don't think what I am
 saying is even a barrier, since the rebalance would just need to recompute
 a rebalance schedule and submit it.  The only processing delay is to allow
 a soft remove to let the client cleanup, before you turn on the new guy, so
 it lags a bit.  Do you think this could this work?

 Thanks,
 Rob

>> On Jul 18, 2014, at 7:22 PM, Robert Withers 
> wrote:
>
> Hi Guozhang,
>
> Thank you for considering my suggestions.  The security layer sounds
 like the right facet to design for these sorts of capabilities.  Have you
 considered a chained ocap security model for the broker using hash tokens?
 This would provide for per-partition read/write capabilities with QoS
 context including leases, revocation, debug level and monitoring.  Overkill
 disappears as no domain specific info needs to be stored at the brokers,
 like consumer/partition assignments.  The read ocap for consumer 7/topic
 bingo/partition 131 could be revoked at the brokers for a partition and
 subsequent reads would fail the fetch for requests with that ocap token.
 You could also dynamically change the log level for a specific
 consumer/partition.
>
> There are advantages we could discuss to having finer grained control.
 Consider that scheduled partition rebalancing could be implemented with no
 pauses from the perspective of the consumer threads; it looks like single
 partition lag, as the offset commit occurs before rotation, with no lag to
 non-rebalanced partitions: rebalance 1 partition per second so as to creep
 load to a newbie consumer.  It would eliminate a global read lock and even
 the internal Kafka consu

Re: how to ensure strong consistency with reasonable availability

2014-07-22 Thread Scott Clasen
Ahh, yes that message loss case. I've wondered about that myself.

I guess I dont really understand why truncating messages is ever the right
thing to do.  As kafka is an 'at least once' system. (send a message, get
no ack, it still might be on the topic) consumers that care will have to
de-dupe anyhow.

To the kafka designers:  is there anything preventing implementation of
alternatives to truncation? when a broker comes back online and needs to
truncate, cant it fire up a producer and take the extra messages and send
them back to the original topic or alternatively an error topic?

Would love to understand the rationale for the current design, as my
perspective is doubtfully as clear as the designers'




On Tue, Jul 22, 2014 at 6:21 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
LEX -)  wrote:

> kafka-1028 addressed another unclean leader election problem. It prevents
> a broker not in ISR from becoming a leader. The problem we are facing is
> that a broker in ISR but without complete messages may become a leader.
> It's also a kind of unclean leader election, but not the one that
> kafka-1028 addressed.
>
> Here I'm trying to give a proof that current kafka doesn't achieve the
> requirement (no message loss, no blocking when 1 broker down) due to its
> two behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less
> messages may be chosen as the leader
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when
> it has less messages than the leader.
>
> We consider a cluster with 3 brokers and a topic with 3 replicas. We
> analyze different cases according to the value of request.required.acks
> (acks for short). For each case and it subcases, we find situations that
> either message loss or service blocking happens. We assume that at the
> beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e.,
> they have the same messages and are all in ISR.
>
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At
> this time, although C hasn't received m, it's still in ISR. If A is killed,
> C can be elected as the new leader, and consumers will miss m.
> 3. acks=-1. Suppose replica.lag.max.messages=M. There are two sub-cases:
> 3.1 M>0. Suppose C be killed. C will be out of ISR after
> replica.lag.time.max.ms. Then the producer publishes M messages to A and
> B. C restarts. C will join in ISR since it is M messages behind A and B.
> Before C replicates all messages, A is killed, and C becomes leader, then
> message loss happens.
> 3.2 M=0. In this case, when the producer publishes at a high speed, B and
> C will fail out of ISR, only A keeps receiving messages. Then A is killed.
> Either message loss or service blocking will happen, depending on whether
> unclean leader election is enabled.
>
>
> From: users@kafka.apache.org At: Jul 21 2014 22:28:18
> To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org
> Subject: Re: how to ensure strong consistency with reasonable availability
>
> You will probably need 0.8.2  which gives
> https://issues.apache.org/jira/browse/KAFKA-1028
>
>
> On Mon, Jul 21, 2014 at 6:37 PM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
> LEX -)  wrote:
>
> > Hi everyone,
> >
> > With a cluster of 3 brokers and a topic of 3 replicas, we want to achieve
> > the following two properties:
> > 1. when only one broker is down, there's no message loss, and
> > procuders/consumers are not blocked.
> > 2. in other more serious problems, for example, one broker is restarted
> > twice in a short period or two brokers are down at the same time,
> > producers/consumers can be blocked, but no message loss is allowed.
> >
> > We haven't found any producer/broker paramter combinations that achieve
> > this. If you know or think some configurations will work, please post
> > details. We have a test bed to verify any given configurations.
> >
> > In addition, I'm wondering if it's necessary to open a jira to require
> the
> > above feature?
> >
> > Thanks,
> > Jiang
>
>
>


Re: how to ensure strong consistency with reasonable availability

2014-07-22 Thread Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -)
kafka-1028 addressed another unclean leader election problem. It prevents a 
broker not in ISR from becoming a leader. The problem we are facing is that a 
broker in ISR but without complete messages may become a leader. It's also a 
kind of unclean leader election, but not the one that kafka-1028 addressed.

Here I'm trying to give a proof that current kafka doesn't achieve the 
requirement (no message loss, no blocking when 1 broker down) due to its two 
behaviors:
1. when choosing a new leader from 2 followers in ISR, the one with less 
messages may be chosen as the leader
2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has 
less messages than the leader.

We consider a cluster with 3 brokers and a topic with 3 replicas. We analyze 
different cases according to the value of request.required.acks (acks for 
short). For each case and it subcases, we find situations that either message 
loss or service blocking happens. We assume that at the beginning, all 3 
replicas, leader A, followers B and C, are in sync, i.e., they have the same 
messages and are all in ISR.

1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
time, although C hasn't received m, it's still in ISR. If A is killed, C can be 
elected as the new leader, and consumers will miss m.
3. acks=-1. Suppose replica.lag.max.messages=M. There are two sub-cases:
3.1 M>0. Suppose C be killed. C will be out of ISR after 
replica.lag.time.max.ms. Then the producer publishes M messages to A and B. C 
restarts. C will join in ISR since it is M messages behind A and B. Before C 
replicates all messages, A is killed, and C becomes leader, then message loss 
happens.
3.2 M=0. In this case, when the producer publishes at a high speed, B and C 
will fail out of ISR, only A keeps receiving messages. Then A is killed. Either 
message loss or service blocking will happen, depending on whether unclean 
leader election is enabled.


From: users@kafka.apache.org At: Jul 21 2014 22:28:18
To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org
Subject: Re: how to ensure strong consistency with reasonable availability

You will probably need 0.8.2  which gives
https://issues.apache.org/jira/browse/KAFKA-1028


On Mon, Jul 21, 2014 at 6:37 PM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
LEX -)  wrote:

> Hi everyone,
>
> With a cluster of 3 brokers and a topic of 3 replicas, we want to achieve
> the following two properties:
> 1. when only one broker is down, there's no message loss, and
> procuders/consumers are not blocked.
> 2. in other more serious problems, for example, one broker is restarted
> twice in a short period or two brokers are down at the same time,
> producers/consumers can be blocked, but no message loss is allowed.
>
> We haven't found any producer/broker paramter combinations that achieve
> this. If you know or think some configurations will work, please post
> details. We have a test bed to verify any given configurations.
>
> In addition, I'm wondering if it's necessary to open a jira to require the
> above feature?
>
> Thanks,
> Jiang




Re: request.required.acks=-1 under high data volume

2014-07-22 Thread Michal Michalski
> Interesting, I had missed that. Is it worth updating the documentation to
> make that more explicit, or do other people find it clear enough?

I agree, it's easy to miss, we missed that information too - we noticed it
only few days ago, while we've been using Kafka for weeks and we spent long
hours reading the docs :-) I think that listing possible values suggests
that they're the only valid ones, while adding more general info on
,  with a sample explanation for 1 and -1
would be more helpful and easier to notice.

M.

On 22 July 2014 03:20, Daniel Compton  wrote:

> Interesting, I had missed that. Is it worth updating the documentation to
> make that more explicit, or do other people find it clear enough?
>
>
> On 22 July 2014 12:47, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) <
> jwu...@bloomberg.net> wrote:
>
> > The document says "typical" values, not "valid" values, are 0, 1, -1. In
> > fact any integer will be accepted.
> >
> > From: users@kafka.apache.org At: Jul 21 2014 18:54:56
> > To: users@kafka.apache.org
> > Subject: Re: request.required.acks=-1 under high data volume
> >
> > In the docs for 0.8.1.1, there are only three options for
> > request.required.acks
> > , {-1, 0,
> 1}.
> > How is request.required.acks=3 a valid configuration property? Am I
> reading
> > it incorrectly or are the docs out of date?
> >
> >
> > On 18 July 2014 06:25, Neha Narkhede  wrote:
> >
> > > Filed https://issues.apache.org/jira/browse/KAFKA-1546 to track the
> > > improvement. It is also a good ticket for some one to jump on, to learn
> > > more about the replication code base.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Mon, Jul 14, 2014 at 7:54 AM, Jun Rao  wrote:
> > >
> > > > Yes, it is true that if all replicas fall out of isr, ack with -1 is
> > the
> > > > same as 1. Normally, we don't expect replicas to fall out of isr
> > though.
> > > > You may want to read
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowtoreducechurnsinISR?WhendoesabrokerleavetheISR
> > > > ?
> > > > to see how to minimize that.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Jul 14, 2014 at 6:36 AM, Jiang Wu (Pricehistory) (BLOOMBERG/
> > 731
> > > > LEX -)  wrote:
> > > >
> > > > > Hi Jay,
> > > > > Thanks for explaining the lag detection mechanism. I think my real
> > > > concern
> > > > > is from the description of request.required.acks=-1 from kafka's
> > > > document:
> > > > > "-1, which means that the producer gets an acknowledgement after
> all
> > > > > in-sync replicas have received the data. This option provides the
> > best
> > > > > durability, we guarantee that no messages will be lost as long as
> at
> > > > least
> > > > > one in sync replica remains."
> > > > > Since it states that acks=-1 provides the best durability, I had
> > > thought
> > > > > it's equivalent to acks=3 for a topic with replicas 3. My
> > understanding
> > > > is
> > > > > that, acks=3 provides the best durability for such a topic, better
> > than
> > > > > ack=2 and ack=1. But because followers may fail out of sync,
> acks=-1
> > > > > actually provides the same level of durability as acks=1. It seems
> to
> > > me
> > > > > there's inconsistency between the behavior of ack=-1 and its
> > > description,
> > > > > therefore one of them may need to be modified.
> > > > >
> > > > > Regards,
> > > > > Jiang
> > > > >
> > > > > From: users@kafka.apache.org At: Jul 11 2014 18:27:46
> > > > > To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -),
> > > > users@kafka.apache.org
> > > > > Cc: wangg...@gmail.com
> > > > > Subject: Re: request.required.acks=-1 under high data volume
> > > > >
> > > > > I think the root problem is that replicas are falling behind and
> > hence
> > > > > are effectively "failed" under normal load and also that you have
> > > > > unclean leader election enabled which "solves" this catastrophic
> > > > > failure by electing new leaders without complete data.
> > > > >
> > > > > Starting in 0.8.2 you will be able to selectively disable unclean
> > > > > leader election.
> > > > >
> > > > > The root problem for the spuriously failing replicas is the
> > > > > configuration replica.lag.max.messages. This configuration defaults
> > to
> > > > > 4000. But throughput can be really high, like a million messages
> per
> > > > > second. At a million messages per second, 4k messages of lag is
> only
> > > > > 4ms behind, which can happen for all kinds of reasons (e.g. just
> > > > > normal linux i/o latency jitter).
> > > > >
> > > > > Jiang, I suspect you can resolve your issue by just making this
> > higher.
> > > > >
> > > > > However, raising this setting is not a panacea. The higher you
> raise
> > > > > it the longer it will take to detect a partition that is actually
> > > > > falling behind.
> > > > >
> > > > > We have been discussing this setting, and if you think about it th