Re: [DISCUSS] Improving ACLs by allowing ip ranges and subnet expressions?

2018-01-24 Thread Sönke Liebau
Hi Colin,

I agree with you on the fact that IP based security is not absolute. I was
considering it as an additional layer of security to be used in conjunction
with ssl certificates, so the rule would contain both the principal and
some hosts. This way if someone manages to obtain the certificate he'd need
to jump through extra hoops to use it from outside the cluster when its not
feasible to lock down Kafka with a firewall.

Mostly though I'd argue the principle that if we consider the feature worth
having it should be "done right" - otherwise we might as well remove it to
avoid giving users a false sense of security.

Regarding your suggestion of access control without security, we could
start honouring the HADOOP_USER_NAME environment variable, many people
should already be used to that :)
Not sure if there is a lot of demand for that feature though, I'd consider
it more dangerous than useful, but that is really just a personal opinion.

Best regards,
Sönke

Am 24.01.2018 23:31 schrieb "Colin McCabe" :

Hi Sonke,

IP address based security doesn't really work, though.  Users can spoof IP
addresses.  They can poison the ARP cache on a local network, or
impersonate a DNS server.

For users who want some access controls, but don't care about security,
maybe we should make it easier to use and create users without enabling
kerberos or similar?

best,
Colin


On Wed, Jan 24, 2018, at 12:59, Sönke Liebau wrote:
> Hi everyone,
>
> the current ACL functionality in Kafka is a bit limited concerning
> host based rules when specifying multiple hosts. A common scenario for
> this would be that if have a YARN cluster running Spark jobs that
> access Kafka and want to create ACLs based on the ip addresses of the
> cluster nodes.
> Currently kafka-acls only allows to specify individual ips, so this
> would look like
>
> ./kafka-acls --add --producer \
> --topic test --authorizer-properties zookeeper.connect=localhost:2181 \
> --allow-principal User:spark \
> --allow-host 10.0.0.10 \
> --allow-host 10.0.0.11 \
> --allow-host ...
>
> which can get unwieldy if you have a 200 node cluster. Internally this
> command would not create a single ACL with multiple host entries, but
> rather one ACL per host that is specified on the command line, which
> makes the ACL listing a bit confusing.
>
> There are currently a few jiras in various states around this topic:
> KAFKA-3531 [1], KAFKA-4759 [2], KAFKA-4985 [3] & KAFKA-5713 [4]
>
> KAFKA-4759 has a patch available, but would currently only add
> interpretation of CIDR notation, no specific ranges, which I think
> could easily be added.
>
> Colin McCabe commented in KAFKA-4985 that so far this was not
> implemented as no standard for expressing ip ranges with a fast
> implementation had been found so far, the available patch uses the
> ipmath [5] package for parsing expressions and range checking - which
> seems fairly small and focused.
>
> This would allow for expressions of the following type:
> 10.0.0.1
> 10.0.0.1-10.0.0.10
> 10.0.0.0/24
>
> I'd suggest extending this a little to allow a semicolon separated
> list of values:
> 10.0.0.1;10.0.0.1-10.0.0.10;10.0.0.0/24
>
> Performance considerations
> Internally the ipmath package represents ip addresses as longs, so if
> we stick with the example of a 200 node cluster from above, with the
> current implementation that would be 200 string comparisons for every
> request, whereas with a range it could potentially come down to two
> long comparisons. This is of course a back-of-the-envelope calculation
> at best, but there at least seems to be a case for investigating this
> a bit further I think.
>
>
> These changes would probably necessitate a KIP - though with some
> consideration they could be made in a way that no existing public
> facing functionality is changed, but for transparency and proper
> documentation I'd say a KIP would be preferable.
>
> I'd be happy to draft one if people think this is worthwhile.
>
> Let me know what you think.
>
> best regards,
> Sönke
>
> [1] https://issues.apache.org/jira/browse/KAFKA-3531
> [2] https://issues.apache.org/jira/browse/KAFKA-4759
> [3] https://issues.apache.org/jira/browse/KAFKA-4985
> [4] https://issues.apache.org/jira/browse/KAFKA-5713
> [5] https://github.com/jgonian/commons-ip-math


Jenkins build is back to normal : kafka-trunk-jdk7 #3115

2018-01-24 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk9 #330

2018-01-24 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-6462: fix unstable ResetIntegrationTest (#4446)

--
[...truncated 1.48 MB...]
org.apache.kafka.streams.integration.EosIntegrationTest > 
shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions STARTED

org.apache.kafka.streams.integration.EosIntegrationTest > 
shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions PASSED

org.apache.kafka.streams.integration.EosIntegrationTest > 
shouldBeAbleToRestartAfterClose STARTED

org.apache.kafka.streams.integration.EosIntegrationTest > 
shouldBeAbleToRestartAfterClose PASSED
ERROR: No tool found matching GRADLE_3_4_RC_2_HOME
ERROR: Could not install GRADLE_4_3_HOME
java.lang.NullPointerException

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled STARTED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled PASSED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldRestoreState STARTED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldRestoreState PASSED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldSuccessfullyStartWhenLoggingDisabled STARTED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldSuccessfullyStartWhenLoggingDisabled PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = false] PASSED


Re: [VOTE] KIP-232: Detect outdated metadata using leaderEpoch and partitionEpoch

2018-01-24 Thread Dong Lin
Hey Colin,

Thanks for reviewing the KIP.

If I understand you right, you maybe suggesting that we can use a global
metadataEpoch that is incremented every time controller updates metadata.
The problem with this solution is that, if a topic is deleted and created
again, user will not know whether that the offset which is stored before
the topic deletion is no longer valid. This motivates the idea to include
per-partition partitionEpoch. Does this sound reasonable?

Then the next question maybe, should we use a global metadataEpoch +
per-partition partitionEpoch, instead of using per-partition leaderEpoch +
per-partition leaderEpoch. The former solution using metadataEpoch would
not work due to the following scenario (provided by Jun):

"Consider the following scenario. In metadata v1, the leader for a
partition is at broker 1. In metadata v2, leader is at broker 2. In
metadata v3, leader is at broker 1 again. The last committed offset in v1,
v2 and v3 are 10, 20 and 30, respectively. A consumer is started and reads
metadata v1 and reads messages from offset 0 to 25 from broker 1. My
understanding is that in the current proposal, the metadata version
associated with offset 25 is v1. The consumer is then restarted and fetches
metadata v2. The consumer tries to read from broker 2, which is the old
leader with the last offset at 20. In this case, the consumer will still
get OffsetOutOfRangeException incorrectly."

Regarding your comment "For the second purpose, this is "soft state"
anyway.  If the client thinks X is the leader but Y is really the leader,
the client will talk to X, and X will point out its mistake by sending back
a NOT_LEADER_FOR_PARTITION.", it is probably no true. The problem here is
that the old leader X may still think it is the leader of the partition and
thus it will not send back NOT_LEADER_FOR_PARTITION. The reason is provided
in KAFKA-6262. Can you check if that makes sense?

Regards,
Dong


On Wed, Jan 24, 2018 at 10:39 AM, Colin McCabe  wrote:

> Hi Dong,
>
> Thanks for proposing this KIP.  I think a metadata epoch is a really good
> idea.
>
> I read through the DISCUSS thread, but I still don't have a clear picture
> of why the proposal uses a metadata epoch per partition rather than a
> global metadata epoch.  A metadata epoch per partition is kind of
> unpleasant-- it's at least 4 extra bytes per partition that we have to send
> over the wire in every full metadata request, which could become extra
> kilobytes on the wire when the number of partitions becomes large.  Plus,
> we have to update all the auxillary classes to include an epoch.
>
> We need to have a global metadata epoch anyway to handle partition
> addition and deletion.  For example, if I give you
> MetadataResponse{part1,epoch 1, part2, epoch 1} and {part1, epoch1}, which
> MetadataResponse is newer?  You have no way of knowing.  It could be that
> part2 has just been created, and the response with 2 partitions is newer.
> Or it coudl be that part2 has just been deleted, and therefore the response
> with 1 partition is newer.  You must have a global epoch to disambiguate
> these two cases.
>
> Previously, I worked on the Ceph distributed filesystem.  Ceph had the
> concept of a map of the whole cluster, maintained by a few servers doing
> paxos.  This map was versioned by a single 64-bit epoch number which
> increased on every change.  It was propagated to clients through gossip.  I
> wonder if something similar could work here?
>
> It seems like the the Kafka MetadataResponse serves two somewhat unrelated
> purposes.  Firstly, it lets clients know what partitions exist in the
> system and where they live.  Secondly, it lets clients know which nodes
> within the partition are in-sync (in the ISR) and which node is the leader.
>
> The first purpose is what you really need a metadata epoch for, I think.
> You want to know whether a partition exists or not, or you want to know
> which nodes you should talk to in order to write to a given partition.  A
> single metadata epoch for the whole response should be adequate here.  We
> should not change the partition assignment without going through zookeeper
> (or a similar system), and this inherently serializes updates into a
> numbered stream.  Brokers should also stop responding to requests when they
> are unable to contact ZK for a certain time period.  This prevents the case
> where a given partition has been moved off some set of nodes, but a client
> still ends up talking to those nodes and writing data there.
>
> For the second purpose, this is "soft state" anyway.  If the client thinks
> X is the leader but Y is really the leader, the client will talk to X, and
> X will point out its mistake by sending back a NOT_LEADER_FOR_PARTITION.
> Then the client can update its metadata again and find the new leader, if
> there is one.  There is no need for an epoch to handle this.  Similarly, I
> can't think of a reason why changing the in-sync replica set 

Re: [DISCUSS] KIP-250 Add Support for Quorum-based Producer Acknowledgment

2018-01-24 Thread Litao Deng
Thanks Jun for the detailed feedback.

Yes, for #1, I mean the live replicas from the ISR.

Actually, I believe for all of the 4 new leader election strategies
(offline, reassign, preferred replica and controlled shutdown), we need to
make corresponding changes. Will document the details in the KIP.

On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao  wrote:

> Hi, Litao,
>
> Thanks for the KIP. Good proposal. A few comments below.
>
> 1. The KIP says "select the live replica with the largest LEO".  I guess
> what you meant is selecting the live replicas in ISR with the largest LEO?
>
> 2. I agree that we can probably just reuse the current min.isr
> configuration, but with a slightly different semantics. Currently, if
> min.isr is set, a user expects the record to be in at least min.isr
> replicas on successful ack. This KIP guarantees this too. Most people are
> probably surprised that currently the ack is only sent back after all
> replicas in ISR receive the record. This KIP will change the ack to only
> wait on min.isr replicas, which matches the user's expectation and gives
> better latency. Currently, we guarantee no data loss if there are fewer
> than replication factor failures. The KIP changes that to fewer than
> min.isr failures. The latter probably matches the user expectation.
>
> 3. I agree that the new leader election process is a bit more complicated.
> The controller now needs to contact all replicas in ISR to determine who
> has the longest log. However, this happens infrequently. So, it's probably
> worth doing for the better latency in #2.
>
> 4. We have to think through the preferred leader election process.
> Currently, the first assigned replica is preferred for load balancing.
> There is a process to automatically move the leader to the preferred
> replica when it's in sync. The issue is that the preferred replica may no
> be the replica with the longest log. Naively switching to the preferred
> replica may cause data loss when there are actually fewer failures than
> configured min.isr. One way to address this issue is to do the following
> steps during preferred leader election: (a) controller sends an RPC request
> to the current leader; (b) the current leader stops taking new writes
> (sending a new error code to the clients) and returns its LEO (call it L)
> to the controller; (c) the controller issues an RPC request to the
> preferred replica and waits its LEO to reach L; (d) the controller changes
> the leader to the preferred replica.
>
> Jun
>
> On Wed, Jan 24, 2018 at 2:51 PM, Litao Deng  >
> wrote:
>
> > Sorry folks, just realized I didn't use the correct thread format for the
> > discussion. I started this new one and copied all of the responses from
> the
> > old one.
> >
> > @Dong
> > It makes sense to just use the min.insync.replicas instead of
> introducing a
> > new config, and we must make this change together with the LEO-based new
> > leader election.
> >
> > @Xi
> > I thought about embedding the LEO information to the ControllerContext,
> > didn't find a way. Using RPC will make the leader election period longer
> > and this should happen in very rare cases (broker failure, controlled
> > shutdown, preferred leader election and partition reassignment).
> >
> > @Jeff
> > The current leader election is to pick the first replica from AR which
> > exists both in the live brokers and ISR sets. I agree with you about
> > changing the current/default behavior will cause many confusions, and
> > that's the reason the title is "Add Support ...". In this case, we
> wouldn't
> > break any current promises and provide a separate option for our user.
> > In terms of KIP-250, I feel it is more like the "Semisynchronous
> > Replication" in the MySQL world, and yes it is something between acks=1
> and
> > acks=insync.replicas. Additionally, I feel KIP-250 and KIP-227 are
> > two orthogonal improvements. KIP-227 is to improve the replication
> protocol
> > (like the introduction of parallel replication in MySQL), and KIP-250 is
> an
> > enhancement for the replication architecture (sync, semi-sync, and
> async).
> >
> >
> > Dong Lin
> >
> > > Thanks for the KIP. I have one quick comment before you provide more
> > detail
> > > on how to select the leader with the largest LEO.
> > > Do you think it would make sense to change the default behavior of
> > acks=-1,
> > > such that broker will acknowledge the message once the message has been
> > > replicated to min.insync.replicas brokers? This would allow us to keep
> > the
> > > same durability guarantee, improve produce request latency without
> > having a
> > > new config.
> >
> >
> > Hu Xi
> >
> > > Currently,  with holding the assigned replicas(AR) for all partitions,
> > > controller is now able to elect new leaders by selecting the first
> > replica
> > > of AR which occurs in both live replica set and ISR. If switching to
> the
> > > LEO-based strategy, controller context might need 

Jenkins build is back to normal : kafka-trunk-jdk8 #2349

2018-01-24 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk7 #3114

2018-01-24 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-6462: fix unstable ResetIntegrationTest (#4446)

--
[...truncated 1.87 MB...]

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryMapValuesState STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryMapValuesState PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldAllowToQueryAfterThreadDied STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldAllowToQueryAfterThreadDied PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryStateWithZeroSizedCache STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryStateWithZeroSizedCache PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryMapValuesAfterFilterState STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryMapValuesAfterFilterState PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryFilterState STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryFilterState PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryStateWithNonZeroSizedCache STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryStateWithNonZeroSizedCache PASSED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduce STARTED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduce PASSED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldGroupByKey STARTED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldGroupByKey PASSED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduceWindowed STARTED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduceWindowed PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest
 STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest
 PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingPattern STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingPattern PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingTopic STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingTopic PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets 
STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest
 STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest
 PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowStreamsExceptionNoResetSpecified STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowStreamsExceptionNoResetSpecified PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduceSessionWindows STARTED


Re: [DISCUSS] KIP-250 Add Support for Quorum-based Producer Acknowledgment

2018-01-24 Thread Guozhang Wang
Thanks for the KIP Litao.

1. I agree with Dong that it would be better to reuse on the existing
config if possible, and with that regards I also agree with Jun's point #2
that previously, there is a confusion on the min.isr's semantics from the
user's perspective, and we learned that it is actually not straight-forward
to educate them with the designed semantics. So we can propose to just
change it semantics such that:

a. "If you want to tolerate X failures, set min.sir to X+1".
b. "You can, optionally, set num.replica > than min.isr to have better
latency" (i.e. redundancy the rescue to reduce your latency tails)

2. Although you have not laid out the details about LEO-based leader
selection, my expectation is that it would involve round trips between
controller and broker. If we assume that such occurrences are not common,
this cost may be worthwhile.


Guozhang



On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao  wrote:

> Hi, Litao,
>
> Thanks for the KIP. Good proposal. A few comments below.
>
> 1. The KIP says "select the live replica with the largest LEO".  I guess
> what you meant is selecting the live replicas in ISR with the largest LEO?
>
> 2. I agree that we can probably just reuse the current min.isr
> configuration, but with a slightly different semantics. Currently, if
> min.isr is set, a user expects the record to be in at least min.isr
> replicas on successful ack. This KIP guarantees this too. Most people are
> probably surprised that currently the ack is only sent back after all
> replicas in ISR receive the record. This KIP will change the ack to only
> wait on min.isr replicas, which matches the user's expectation and gives
> better latency. Currently, we guarantee no data loss if there are fewer
> than replication factor failures. The KIP changes that to fewer than
> min.isr failures. The latter probably matches the user expectation.
>
> 3. I agree that the new leader election process is a bit more complicated.
> The controller now needs to contact all replicas in ISR to determine who
> has the longest log. However, this happens infrequently. So, it's probably
> worth doing for the better latency in #2.
>
> 4. We have to think through the preferred leader election process.
> Currently, the first assigned replica is preferred for load balancing.
> There is a process to automatically move the leader to the preferred
> replica when it's in sync. The issue is that the preferred replica may no
> be the replica with the longest log. Naively switching to the preferred
> replica may cause data loss when there are actually fewer failures than
> configured min.isr. One way to address this issue is to do the following
> steps during preferred leader election: (a) controller sends an RPC request
> to the current leader; (b) the current leader stops taking new writes
> (sending a new error code to the clients) and returns its LEO (call it L)
> to the controller; (c) the controller issues an RPC request to the
> preferred replica and waits its LEO to reach L; (d) the controller changes
> the leader to the preferred replica.
>
> Jun
>
> On Wed, Jan 24, 2018 at 2:51 PM, Litao Deng  >
> wrote:
>
> > Sorry folks, just realized I didn't use the correct thread format for the
> > discussion. I started this new one and copied all of the responses from
> the
> > old one.
> >
> > @Dong
> > It makes sense to just use the min.insync.replicas instead of
> introducing a
> > new config, and we must make this change together with the LEO-based new
> > leader election.
> >
> > @Xi
> > I thought about embedding the LEO information to the ControllerContext,
> > didn't find a way. Using RPC will make the leader election period longer
> > and this should happen in very rare cases (broker failure, controlled
> > shutdown, preferred leader election and partition reassignment).
> >
> > @Jeff
> > The current leader election is to pick the first replica from AR which
> > exists both in the live brokers and ISR sets. I agree with you about
> > changing the current/default behavior will cause many confusions, and
> > that's the reason the title is "Add Support ...". In this case, we
> wouldn't
> > break any current promises and provide a separate option for our user.
> > In terms of KIP-250, I feel it is more like the "Semisynchronous
> > Replication" in the MySQL world, and yes it is something between acks=1
> and
> > acks=insync.replicas. Additionally, I feel KIP-250 and KIP-227 are
> > two orthogonal improvements. KIP-227 is to improve the replication
> protocol
> > (like the introduction of parallel replication in MySQL), and KIP-250 is
> an
> > enhancement for the replication architecture (sync, semi-sync, and
> async).
> >
> >
> > Dong Lin
> >
> > > Thanks for the KIP. I have one quick comment before you provide more
> > detail
> > > on how to select the leader with the largest LEO.
> > > Do you think it would make sense to change the default behavior of
> > acks=-1,
> > > 

Re: [VOTE] KIP-227: Introduce Fetch Requests that are Incremental to Increase Partition Scalability

2018-01-24 Thread Ismael Juma
Agreed, Jun.

Ismael

On Wed, Jan 24, 2018 at 4:08 PM, Jun Rao  wrote:

> Since this is a server side metric, it's probably better to use Yammer Rate
> (which has count) for consistency.
>
> Thanks,
>
> Jun
>
> On Tue, Jan 23, 2018 at 10:17 PM, Colin McCabe  wrote:
>
> > On Tue, Jan 23, 2018, at 21:47, Ismael Juma wrote:
> > > Colin,
> > >
> > > You get a cumulative count for rates since we added
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 187+-+Add+cumulative+count+metric+for+all+Kafka+rate+metrics>
> >
> > Oh, good point.
> >
> > C.
> >
> >
> > > Ismael
> > >
> > > On Tue, Jan 23, 2018 at 4:21 PM, Colin McCabe
> > >  wrote:>
> > > > On Tue, Jan 23, 2018, at 11:57, Jun Rao wrote:
> > > > > Hi, Collin,
> > > > >
> > > > > Thanks for the updated KIP. +1. Just a minor comment. It seems
> > > > > that it's> > > better for TotalIncrementalFetchSessionsEvicted to
> > be a rate,
> > > > > instead of> > > just an ever-growing count.
> > > >
> > > > Thanks.  Perhaps we can add the rate in addition to the total
> > > > eviction> > count?
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > >
> > > > > Jun
> > > > >
> > > > > On Mon, Jan 22, 2018 at 4:35 PM, Jason Gustafson
> > > > > > > wrote:
> > > > >
> > > > > > >
> > > > > > > What if we want to have fetch sessions for non-incremental
> > > > > > > fetches> > in the
> > > > > > > future, though?  Also, we don't expect this configuration to
> > > > > > > be> > changed
> > > > > > > often, so it doesn't really need to be short.
> > > > > >
> > > > > >
> > > > > > Hmm.. But in that case, I'm not sure we'd need to distinguish
> > > > > > the two> > > > cases. If the non-incremental sessions are
> > occupying space
> > > > proportional to
> > > > > > the fetched partitions, using the same config for both would be>
> >
> > reasonable.
> > > > > > If they are not (which is more likely), we probably wouldn't
> > > > > > need a> > config
> > > > > > at all. Given that, I'd probably still opt for the more concise
> > > > > > name.> > It's
> > > > > > not a blocker for me though.
> > > > > >
> > > > > > +1 on the KIP.
> > > > > >
> > > > > > -Jason
> > > > > >
> > > > > > On Mon, Jan 22, 2018 at 3:56 PM, Colin McCabe
> > > > > > > > wrote:
> > > > > >
> > > > > > > On Mon, Jan 22, 2018, at 15:42, Jason Gustafson wrote:
> > > > > > > > Hi Colin,
> > > > > > > >
> > > > > > > > This is looking good to me. A few comments:
> > > > > > > >
> > > > > > > > 1. The fetch type seems unnecessary in the request and
> > > > > > > >response> > schemas
> > > > > > > > since it can be inferred by the sessionId/epoch.
> > > > > > >
> > > > > > > Hi Jason,
> > > > > > >
> > > > > > > Fair enough... if we need it later, we can always bump the RPC>
> > > version.
> > > > > > >
> > > > > > > > 2. I agree with Jun that a separate array for partitions to
> > > > > > > >remove> > > > would
> > > > > > > be
> > > > > > > > more intuitive.
> > > > > > >
> > > > > > > OK.  I'll switch it to using a separate array.
> > > > > > >
> > > > > > > > 3. I'm not super thrilled with the cache configuration since
> > > > > > > >it> > seems
> > > > > > to
> > > > > > > > tie us a bit too closely to the implementation. You've
> > > > > > > > mostly> > convinced
> > > > > > > me
> > > > > > > > on the need for the slots config, but I wonder if we can at
> > > > > > > > least> > do
> > > > > > > > without "min.incremental.fetch.session.eviction.ms"? For
> > > > > > > > one, I> > think
> > > > > > > the
> > > > > > > > broker should reserve the right to evict sessions at will.
> > > > > > > > We> > shouldn't
> > > > > > > be
> > > > > > > > stuck maintaining a small session at the expense of a much
> > > > > > > > larger> > one
> > > > > > > just
> > > > > > > > to enforce this timeout. Internally, I think having some
> > > > > > > > cache> > > > stickiness
> > > > > > > > to avoid thrashing makes sense, but I think static values
> > > > > > > > are> > likely to
> > > > > > > be
> > > > > > > > good enough and that lets us retain some flexibility to
> > > > > > > > change the> > > > > behavior
> > > > > > > > in the future.
> > > > > > >
> > > > > > > OK.
> > > > > > >
> > > > > > > > 4. I think the word "incremental" is redundant in the config
> > > > > > > >names.> > > > Maybe
> > > > > > > > it could just be "max.fetch.session.cache.slots" for
> > > > > > > > example?> > > > >
> > > > > > > What if we want to have fetch sessions for non-incremental
> > > > > > > fetches> > in the
> > > > > > > future, though?  Also, we don't expect this configuration to
> > > > > > > be> > changed
> > > > > > > often, so it doesn't really need to be short.
> > > > > > >
> > > > > > > best,
> > > > > > > Colin
> > > > > > >
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Jason
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sat, Jan 20, 2018 at 12:54 PM, Colin McCabe

Re: [VOTE] KIP-227: Introduce Fetch Requests that are Incremental to Increase Partition Scalability

2018-01-24 Thread Jun Rao
Since this is a server side metric, it's probably better to use Yammer Rate
(which has count) for consistency.

Thanks,

Jun

On Tue, Jan 23, 2018 at 10:17 PM, Colin McCabe  wrote:

> On Tue, Jan 23, 2018, at 21:47, Ismael Juma wrote:
> > Colin,
> >
> > You get a cumulative count for rates since we added
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 187+-+Add+cumulative+count+metric+for+all+Kafka+rate+metrics>
>
> Oh, good point.
>
> C.
>
>
> > Ismael
> >
> > On Tue, Jan 23, 2018 at 4:21 PM, Colin McCabe
> >  wrote:>
> > > On Tue, Jan 23, 2018, at 11:57, Jun Rao wrote:
> > > > Hi, Collin,
> > > >
> > > > Thanks for the updated KIP. +1. Just a minor comment. It seems
> > > > that it's> > > better for TotalIncrementalFetchSessionsEvicted to
> be a rate,
> > > > instead of> > > just an ever-growing count.
> > >
> > > Thanks.  Perhaps we can add the rate in addition to the total
> > > eviction> > count?
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > Jun
> > > >
> > > > On Mon, Jan 22, 2018 at 4:35 PM, Jason Gustafson
> > > > > > wrote:
> > > >
> > > > > >
> > > > > > What if we want to have fetch sessions for non-incremental
> > > > > > fetches> > in the
> > > > > > future, though?  Also, we don't expect this configuration to
> > > > > > be> > changed
> > > > > > often, so it doesn't really need to be short.
> > > > >
> > > > >
> > > > > Hmm.. But in that case, I'm not sure we'd need to distinguish
> > > > > the two> > > > cases. If the non-incremental sessions are
> occupying space
> > > proportional to
> > > > > the fetched partitions, using the same config for both would be> >
> reasonable.
> > > > > If they are not (which is more likely), we probably wouldn't
> > > > > need a> > config
> > > > > at all. Given that, I'd probably still opt for the more concise
> > > > > name.> > It's
> > > > > not a blocker for me though.
> > > > >
> > > > > +1 on the KIP.
> > > > >
> > > > > -Jason
> > > > >
> > > > > On Mon, Jan 22, 2018 at 3:56 PM, Colin McCabe
> > > > > > > wrote:
> > > > >
> > > > > > On Mon, Jan 22, 2018, at 15:42, Jason Gustafson wrote:
> > > > > > > Hi Colin,
> > > > > > >
> > > > > > > This is looking good to me. A few comments:
> > > > > > >
> > > > > > > 1. The fetch type seems unnecessary in the request and
> > > > > > >response> > schemas
> > > > > > > since it can be inferred by the sessionId/epoch.
> > > > > >
> > > > > > Hi Jason,
> > > > > >
> > > > > > Fair enough... if we need it later, we can always bump the RPC>
> > version.
> > > > > >
> > > > > > > 2. I agree with Jun that a separate array for partitions to
> > > > > > >remove> > > > would
> > > > > > be
> > > > > > > more intuitive.
> > > > > >
> > > > > > OK.  I'll switch it to using a separate array.
> > > > > >
> > > > > > > 3. I'm not super thrilled with the cache configuration since
> > > > > > >it> > seems
> > > > > to
> > > > > > > tie us a bit too closely to the implementation. You've
> > > > > > > mostly> > convinced
> > > > > > me
> > > > > > > on the need for the slots config, but I wonder if we can at
> > > > > > > least> > do
> > > > > > > without "min.incremental.fetch.session.eviction.ms"? For
> > > > > > > one, I> > think
> > > > > > the
> > > > > > > broker should reserve the right to evict sessions at will.
> > > > > > > We> > shouldn't
> > > > > > be
> > > > > > > stuck maintaining a small session at the expense of a much
> > > > > > > larger> > one
> > > > > > just
> > > > > > > to enforce this timeout. Internally, I think having some
> > > > > > > cache> > > > stickiness
> > > > > > > to avoid thrashing makes sense, but I think static values
> > > > > > > are> > likely to
> > > > > > be
> > > > > > > good enough and that lets us retain some flexibility to
> > > > > > > change the> > > > > behavior
> > > > > > > in the future.
> > > > > >
> > > > > > OK.
> > > > > >
> > > > > > > 4. I think the word "incremental" is redundant in the config
> > > > > > >names.> > > > Maybe
> > > > > > > it could just be "max.fetch.session.cache.slots" for
> > > > > > > example?> > > > >
> > > > > > What if we want to have fetch sessions for non-incremental
> > > > > > fetches> > in the
> > > > > > future, though?  Also, we don't expect this configuration to
> > > > > > be> > changed
> > > > > > often, so it doesn't really need to be short.
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Jason
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Sat, Jan 20, 2018 at 12:54 PM, Colin McCabe
> > > > > > >  > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > On Fri, Jan 19, 2018, at 15:02, Jun Rao wrote:
> > > > > > > > > Hi, Colin,
> > > > > > > > >
> > > > > > > > > Thanks for the KIP. Looks good to me overall. Just a
> > > > > > > > > couple of> > more
> > > > > > > > > comments.
> > > > > > > > >
> > > > > > > > > 1. As I mentioned 

[jira] [Created] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-01-24 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6481:
-

 Summary: Improving performance of the function 
ControllerChannelManager.addUpdateMetadataRequestForBrokers
 Key: KAFKA-6481
 URL: https://issues.apache.org/jira/browse/KAFKA-6481
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
    ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
    ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
    inside a for-loop for each partition 
ReplicaStateMachine.doHandleStateChanges
ReplicaStateMachine.handleStateChanges
KafkaController.onReplicasBecomeOffline
KafkaController.onBrokerFailure


How to reproduce the problem:
1. Cretae a cluster with 2 brokers having id 1 and 2
2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.
4. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`
5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
6. Verify that the following log message appear over 200 times in the 
controller.log file, one for each iteration of the a0 partitions
 "Leader not yet assigned for partition [a0,..]. Skip sending 
UpdateMetadataRequest."
 
 What happened was 
 1. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
 2. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) entries of the logs above.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
Also I've verified that topic deletion for topic a1 still works fine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk9 #329

2018-01-24 Thread Apache Jenkins Server
See 

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on ubuntu-eu2 (ubuntu trusty) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
ERROR: Error fetching remote repo 'origin'
hudson.plugins.git.GitException: Failed to fetch from 
https://github.com/apache/kafka.git
at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:825)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1092)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1123)
at hudson.scm.SCM.checkout(SCM.java:495)
at hudson.model.AbstractProject.checkout(AbstractProject.java:1202)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1724)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:421)
Caused by: hudson.plugins.git.GitException: Command "git config 
remote.origin.url https://github.com/apache/kafka.git; returned status code 4:
stdout: 
stderr: error: failed to write new configuration file .git/config.lock

at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:1970)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:1938)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:1934)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommand(CliGitAPIImpl.java:1572)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommand(CliGitAPIImpl.java:1584)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.setRemoteUrl(CliGitAPIImpl.java:1218)
at hudson.plugins.git.GitAPI.setRemoteUrl(GitAPI.java:160)
at sun.reflect.GeneratedMethodAccessor69.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
hudson.remoting.RemoteInvocationHandler$RPCRequest.perform(RemoteInvocationHandler.java:922)
at 
hudson.remoting.RemoteInvocationHandler$RPCRequest.call(RemoteInvocationHandler.java:896)
at 
hudson.remoting.RemoteInvocationHandler$RPCRequest.call(RemoteInvocationHandler.java:853)
at hudson.remoting.UserRequest.perform(UserRequest.java:207)
at hudson.remoting.UserRequest.perform(UserRequest.java:53)
at hudson.remoting.Request$2.run(Request.java:358)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Suppressed: hudson.remoting.Channel$CallSiteStackTrace: Remote call to 
ubuntu-eu2
at 
hudson.remoting.Channel.attachCallSiteStackTrace(Channel.java:1693)
at hudson.remoting.UserResponse.retrieve(UserRequest.java:310)
at hudson.remoting.Channel.call(Channel.java:908)
at 
hudson.remoting.RemoteInvocationHandler.invoke(RemoteInvocationHandler.java:281)
at com.sun.proxy.$Proxy110.setRemoteUrl(Unknown Source)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl.setRemoteUrl(RemoteGitImpl.java:295)
at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:813)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1092)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1123)
at hudson.scm.SCM.checkout(SCM.java:495)
at 
hudson.model.AbstractProject.checkout(AbstractProject.java:1202)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at 
jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1724)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at 
hudson.model.ResourceController.execute(ResourceController.java:97)
at 

Re: [DISCUSS] KIP-250 Add Support for Quorum-based Producer Acknowledgment

2018-01-24 Thread Litao Deng
Sorry folks, just realized I didn't use the correct thread format for the
discussion. I started this new one and copied all of the responses from the
old one.

@Dong
It makes sense to just use the min.insync.replicas instead of introducing a
new config, and we must make this change together with the LEO-based new
leader election.

@Xi
I thought about embedding the LEO information to the ControllerContext,
didn't find a way. Using RPC will make the leader election period longer
and this should happen in very rare cases (broker failure, controlled
shutdown, preferred leader election and partition reassignment).

@Jeff
The current leader election is to pick the first replica from AR which
exists both in the live brokers and ISR sets. I agree with you about
changing the current/default behavior will cause many confusions, and
that's the reason the title is "Add Support ...". In this case, we wouldn't
break any current promises and provide a separate option for our user.
In terms of KIP-250, I feel it is more like the "Semisynchronous
Replication" in the MySQL world, and yes it is something between acks=1 and
acks=insync.replicas. Additionally, I feel KIP-250 and KIP-227 are
two orthogonal improvements. KIP-227 is to improve the replication protocol
(like the introduction of parallel replication in MySQL), and KIP-250 is an
enhancement for the replication architecture (sync, semi-sync, and async).


Dong Lin

> Thanks for the KIP. I have one quick comment before you provide more detail
> on how to select the leader with the largest LEO.
> Do you think it would make sense to change the default behavior of acks=-1,
> such that broker will acknowledge the message once the message has been
> replicated to min.insync.replicas brokers? This would allow us to keep the
> same durability guarantee, improve produce request latency without having a
> new config.


Hu Xi

> Currently,  with holding the assigned replicas(AR) for all partitions,
> controller is now able to elect new leaders by selecting the first replica
> of AR which occurs in both live replica set and ISR. If switching to the
> LEO-based strategy, controller context might need to be enriched or
> augmented to store those values.  If retrieving those LEOs real-time,
> several rounds of RPCs are unavoidable which seems to violate the  original
> intention of this KIP.​


Jeff Widman

> I agree with Dong, we should see if it's possible to change the default
> behavior so that as soon as min.insync.replicas brokers respond than the
> broker acknowledges the message back to the client without waiting for
> additional brokers who are in the in-sync replica list to respond. (I
> actually thought it already worked this way).
> As you implied in the KIP though, changing this default introduces a weird
> state where an in-sync follower broker is not guaranteed to have a
> message...
> So at a minimum, the leadership failover algorithm would need to be sure to
> pick the most up-to-date follower... I thought it already did this?
> But if multiple brokers fail in quick succession, then a broker that was in
> the ISR could become a leader without ever receiving the message...
> violating the current promises of unclean.leader.election.enable=False...
> so changing the default might be not be a tenable solution.
> What also jumped out at me in the KIP was the goal of reducing p999 when
> setting replica lag time at 10 seconds(!!)... I understand the desire to
> minimize frequent ISR shrink/expansion, as I face this same issue at my day
> job. But what you're essentially trying to do here is create an additional
> replication state that is in-between acks=1 and acks = ISR to paper over a
> root problem of ISR shrink/expansion...
> I'm just wary of shipping more features (and more operational confusion) if
> it's only addressing the symptom rather than the root cause. For example,
> my day job's problem is we run a very high number of low-traffic
> partitions-per-broker, so the fetch requests hit many partitions before
> they fill. Solving that requires changing our architecture + making the
> replication protocol more efficient (KIP-227).


On Tue, Jan 23, 2018 at 10:02 PM, Litao Deng  wrote:

> Hey folks. I would like to add a feature to support the quorum-based
> acknowledgment for the producer request. We have been running a modified
> version of Kafka on our testing cluster for weeks, the improvement of P999
> is significant with very stable latency. Additionally, I have a proposal to
> achieve a similar data durability as with the insync.replicas-based
> acknowledgment through LEO-based leader election.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 250+Add+Support+for+Quorum-based+Producer+Acknowledge
>


Re: [DISCUSS] Improving ACLs by allowing ip ranges and subnet expressions?

2018-01-24 Thread Colin McCabe
Hi Sonke,

IP address based security doesn't really work, though.  Users can spoof IP 
addresses.  They can poison the ARP cache on a local network, or impersonate a 
DNS server.

For users who want some access controls, but don't care about security, maybe 
we should make it easier to use and create users without enabling kerberos or 
similar?

best,
Colin


On Wed, Jan 24, 2018, at 12:59, Sönke Liebau wrote:
> Hi everyone,
> 
> the current ACL functionality in Kafka is a bit limited concerning
> host based rules when specifying multiple hosts. A common scenario for
> this would be that if have a YARN cluster running Spark jobs that
> access Kafka and want to create ACLs based on the ip addresses of the
> cluster nodes.
> Currently kafka-acls only allows to specify individual ips, so this
> would look like
> 
> ./kafka-acls --add --producer \
> --topic test --authorizer-properties zookeeper.connect=localhost:2181 \
> --allow-principal User:spark \
> --allow-host 10.0.0.10 \
> --allow-host 10.0.0.11 \
> --allow-host ...
> 
> which can get unwieldy if you have a 200 node cluster. Internally this
> command would not create a single ACL with multiple host entries, but
> rather one ACL per host that is specified on the command line, which
> makes the ACL listing a bit confusing.
> 
> There are currently a few jiras in various states around this topic:
> KAFKA-3531 [1], KAFKA-4759 [2], KAFKA-4985 [3] & KAFKA-5713 [4]
> 
> KAFKA-4759 has a patch available, but would currently only add
> interpretation of CIDR notation, no specific ranges, which I think
> could easily be added.
> 
> Colin McCabe commented in KAFKA-4985 that so far this was not
> implemented as no standard for expressing ip ranges with a fast
> implementation had been found so far, the available patch uses the
> ipmath [5] package for parsing expressions and range checking - which
> seems fairly small and focused.
> 
> This would allow for expressions of the following type:
> 10.0.0.1
> 10.0.0.1-10.0.0.10
> 10.0.0.0/24
> 
> I'd suggest extending this a little to allow a semicolon separated
> list of values:
> 10.0.0.1;10.0.0.1-10.0.0.10;10.0.0.0/24
> 
> Performance considerations
> Internally the ipmath package represents ip addresses as longs, so if
> we stick with the example of a 200 node cluster from above, with the
> current implementation that would be 200 string comparisons for every
> request, whereas with a range it could potentially come down to two
> long comparisons. This is of course a back-of-the-envelope calculation
> at best, but there at least seems to be a case for investigating this
> a bit further I think.
> 
> 
> These changes would probably necessitate a KIP - though with some
> consideration they could be made in a way that no existing public
> facing functionality is changed, but for transparency and proper
> documentation I'd say a KIP would be preferable.
> 
> I'd be happy to draft one if people think this is worthwhile.
> 
> Let me know what you think.
> 
> best regards,
> Sönke
> 
> [1] https://issues.apache.org/jira/browse/KAFKA-3531
> [2] https://issues.apache.org/jira/browse/KAFKA-4759
> [3] https://issues.apache.org/jira/browse/KAFKA-4985
> [4] https://issues.apache.org/jira/browse/KAFKA-5713
> [5] https://github.com/jgonian/commons-ip-math


Build failed in Jenkins: kafka-trunk-jdk8 #2348

2018-01-24 Thread Apache Jenkins Server
See 

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on ubuntu-eu2 (ubuntu trusty) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
ERROR: Error fetching remote repo 'origin'
hudson.plugins.git.GitException: Failed to fetch from 
https://github.com/apache/kafka.git
at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:825)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1092)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1123)
at hudson.scm.SCM.checkout(SCM.java:495)
at hudson.model.AbstractProject.checkout(AbstractProject.java:1202)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1724)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:421)
Caused by: hudson.plugins.git.GitException: Command "git config 
remote.origin.url https://github.com/apache/kafka.git; returned status code 4:
stdout: 
stderr: error: failed to write new configuration file .git/config.lock

at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:1970)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:1938)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:1934)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommand(CliGitAPIImpl.java:1572)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommand(CliGitAPIImpl.java:1584)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.setRemoteUrl(CliGitAPIImpl.java:1218)
at hudson.plugins.git.GitAPI.setRemoteUrl(GitAPI.java:160)
at sun.reflect.GeneratedMethodAccessor69.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
hudson.remoting.RemoteInvocationHandler$RPCRequest.perform(RemoteInvocationHandler.java:922)
at 
hudson.remoting.RemoteInvocationHandler$RPCRequest.call(RemoteInvocationHandler.java:896)
at 
hudson.remoting.RemoteInvocationHandler$RPCRequest.call(RemoteInvocationHandler.java:853)
at hudson.remoting.UserRequest.perform(UserRequest.java:207)
at hudson.remoting.UserRequest.perform(UserRequest.java:53)
at hudson.remoting.Request$2.run(Request.java:358)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Suppressed: hudson.remoting.Channel$CallSiteStackTrace: Remote call to 
ubuntu-eu2
at 
hudson.remoting.Channel.attachCallSiteStackTrace(Channel.java:1693)
at hudson.remoting.UserResponse.retrieve(UserRequest.java:310)
at hudson.remoting.Channel.call(Channel.java:908)
at 
hudson.remoting.RemoteInvocationHandler.invoke(RemoteInvocationHandler.java:281)
at com.sun.proxy.$Proxy110.setRemoteUrl(Unknown Source)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl.setRemoteUrl(RemoteGitImpl.java:295)
at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:813)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1092)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1123)
at hudson.scm.SCM.checkout(SCM.java:495)
at 
hudson.model.AbstractProject.checkout(AbstractProject.java:1202)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at 
jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1724)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at 
hudson.model.ResourceController.execute(ResourceController.java:97)
at 

Build failed in Jenkins: kafka-trunk-jdk9 #328

2018-01-24 Thread Apache Jenkins Server
See 

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on ubuntu-eu2 (ubuntu trusty) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
ERROR: Error fetching remote repo 'origin'
hudson.plugins.git.GitException: Failed to fetch from 
https://github.com/apache/kafka.git
at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:825)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1092)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1123)
at hudson.scm.SCM.checkout(SCM.java:495)
at hudson.model.AbstractProject.checkout(AbstractProject.java:1202)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1724)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:421)
Caused by: hudson.plugins.git.GitException: Command "git config 
remote.origin.url https://github.com/apache/kafka.git; returned status code 4:
stdout: 
stderr: error: failed to write new configuration file .git/config.lock

at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:1970)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:1938)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:1934)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommand(CliGitAPIImpl.java:1572)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommand(CliGitAPIImpl.java:1584)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.setRemoteUrl(CliGitAPIImpl.java:1218)
at hudson.plugins.git.GitAPI.setRemoteUrl(GitAPI.java:160)
at sun.reflect.GeneratedMethodAccessor69.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
hudson.remoting.RemoteInvocationHandler$RPCRequest.perform(RemoteInvocationHandler.java:922)
at 
hudson.remoting.RemoteInvocationHandler$RPCRequest.call(RemoteInvocationHandler.java:896)
at 
hudson.remoting.RemoteInvocationHandler$RPCRequest.call(RemoteInvocationHandler.java:853)
at hudson.remoting.UserRequest.perform(UserRequest.java:207)
at hudson.remoting.UserRequest.perform(UserRequest.java:53)
at hudson.remoting.Request$2.run(Request.java:358)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Suppressed: hudson.remoting.Channel$CallSiteStackTrace: Remote call to 
ubuntu-eu2
at 
hudson.remoting.Channel.attachCallSiteStackTrace(Channel.java:1693)
at hudson.remoting.UserResponse.retrieve(UserRequest.java:310)
at hudson.remoting.Channel.call(Channel.java:908)
at 
hudson.remoting.RemoteInvocationHandler.invoke(RemoteInvocationHandler.java:281)
at com.sun.proxy.$Proxy110.setRemoteUrl(Unknown Source)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl.setRemoteUrl(RemoteGitImpl.java:295)
at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:813)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1092)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1123)
at hudson.scm.SCM.checkout(SCM.java:495)
at 
hudson.model.AbstractProject.checkout(AbstractProject.java:1202)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at 
jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1724)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at 
hudson.model.ResourceController.execute(ResourceController.java:97)
at 

Re: Streams - retry configuration publishing changelog updates?

2018-01-24 Thread Guozhang Wang
Hello Dan,

It seems you are hitting a known issue that KIP-91 is trying to fix (it is
a general issue of producer itself):
https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer

It means that the records has never been sent out before expiring since the
metadata for finding the leader of "my-table-changelog-18" is never known,
it is likely due to your broker is not available and hence metadata refresh
never succeed. Hence, retries config does not help here since it is never
sent actually.

Before KIP-91 gets merged in, you will have to increase your
request.timeout.ms (btw it is already as high as 30 secs, do you have a
broker offline for about that long period?) to larger value so that the
producer can tolerate longer time that metadata is not available for
sending.

Guozhang


On Tue, Jan 23, 2018 at 5:27 PM, dan bress  wrote:

> I'm seeing this timeout in my Kafka Streams application logs:
>
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7
> record(s) for my-table-changelog-18: 30011 ms has passed since last append
>
> What kind of retries to I get by default publishing to changelogs?  I have
> these producer configs set:
>
> 2018-01-23 21:14:31,197 INF [main] ProducerConfig ProducerConfig values:
> acks = all
> retries = 30
> retry.backoff.ms = 1000
> request.timeout.ms = 3
>
> Does this mean I am getting 30 retries when trying to publish to
> changelogs?  Or do I need to set another config(StreamsConfig) to get more
> retries for changelog publishing?
>
> Thanks!
> Dan
>



-- 
-- Guozhang


Re: 1.1 KIPs

2018-01-24 Thread Vahid S Hashemian
Hi Damian,

Could you please add KIP-229 to the list? It was approved earlier this 
week
https://www.mail-archive.com/dev@kafka.apache.org/msg84851.html

Thanks for running the release.
--Vahid




From:   Damian Guy 
To: dev@kafka.apache.org
Date:   01/24/2018 01:20 PM
Subject:Re: 1.1 KIPs



Hi,

The KIP deadline has passed and i've updated the release plan:
https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_pages_viewpage.action-3FpageId-3D75957546=DwIFaQ=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=TVUESwz92IFTGWgOw60U-bc5Fih9bzGtCRahKUZlSTc=ZsBKyGkffkjDH0yHsVpxNP-_qhSR681YSL48Bi5cTBs=

If there is anything i've missed please let me know.

The feature freeze deadline is January 30th. At this point i'll cut the
branch for 1.1. So please make sure any major features have been committed
by then.

Thanks,
Damian

On Tue, 23 Jan 2018 at 15:29 Damian Guy  wrote:

> Hi,
>
> Today is the KIP deadline. Tomorrow I'll update the release page with 
any
> KIPS that have recently been voted on and accepted according to the 
process
> that can be found here:
> 
https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_Kafka-2BImprovement-2BProposals-23KafkaImprovementProposals-2DProcess=DwIFaQ=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=TVUESwz92IFTGWgOw60U-bc5Fih9bzGtCRahKUZlSTc=JEIfWpaEOFJ8NmyLHUs7NLlcgPgsNEdM1Iy9Y5Tr3Vs=

>
> Thanks,
> Damian
>
> On Thu, 18 Jan 2018 at 12:47 Damian Guy  wrote:
>
>> Gentle reminder that KIP deadline is just 5 days away. If there is
>> anything that wants to be in 1.1 and hasn't been voted on yet, now is 
the
>> time!
>>
>> On Thu, 18 Jan 2018 at 08:49 Damian Guy  wrote:
>>
>>> Hi Xavier,
>>> I'll add it to the plan.
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Tue, 16 Jan 2018 at 19:04 Xavier Léauté  
wrote:
>>>
 Hi Damian, I believe the list should also include KAFKA-5886 (KIP-91)
 which
 was voted for 1.0 but wasn't ready to be merged in time.

 On Tue, Jan 16, 2018 at 5:13 AM Damian Guy 
 wrote:

 > Hi,
 >
 > This is a reminder that we have one week left until the KIP 
deadline
 of Jan
 > 23. There are still some KIPs that are under discussion and/or 
being
 voted
 > on. Please keep in mind that the voting needs to be complete before
 the
 > deadline for the KIP to be added to the release.
 >
 >
 
https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_pages_viewpage.action-3FpageId-3D75957546=DwIFaQ=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=TVUESwz92IFTGWgOw60U-bc5Fih9bzGtCRahKUZlSTc=ZsBKyGkffkjDH0yHsVpxNP-_qhSR681YSL48Bi5cTBs=

 >
 > Thanks,
 > Damian
 >

>>>






Re: 1.1 KIPs

2018-01-24 Thread Damian Guy
Hi,

The KIP deadline has passed and i've updated the release plan:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957546
If there is anything i've missed please let me know.

The feature freeze deadline is January 30th. At this point i'll cut the
branch for 1.1. So please make sure any major features have been committed
by then.

Thanks,
Damian

On Tue, 23 Jan 2018 at 15:29 Damian Guy  wrote:

> Hi,
>
> Today is the KIP deadline. Tomorrow I'll update the release page with any
> KIPS that have recently been voted on and accepted according to the process
> that can be found here:
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-Process
>
> Thanks,
> Damian
>
> On Thu, 18 Jan 2018 at 12:47 Damian Guy  wrote:
>
>> Gentle reminder that KIP deadline is just 5 days away. If there is
>> anything that wants to be in 1.1 and hasn't been voted on yet, now is the
>> time!
>>
>> On Thu, 18 Jan 2018 at 08:49 Damian Guy  wrote:
>>
>>> Hi Xavier,
>>> I'll add it to the plan.
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Tue, 16 Jan 2018 at 19:04 Xavier Léauté  wrote:
>>>
 Hi Damian, I believe the list should also include KAFKA-5886 (KIP-91)
 which
 was voted for 1.0 but wasn't ready to be merged in time.

 On Tue, Jan 16, 2018 at 5:13 AM Damian Guy 
 wrote:

 > Hi,
 >
 > This is a reminder that we have one week left until the KIP deadline
 of Jan
 > 23. There are still some KIPs that are under discussion and/or being
 voted
 > on. Please keep in mind that the voting needs to be complete before
 the
 > deadline for the KIP to be added to the release.
 >
 >
 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957546
 >
 > Thanks,
 > Damian
 >

>>>


[DISCUSS] Improving ACLs by allowing ip ranges and subnet expressions?

2018-01-24 Thread Sönke Liebau
Hi everyone,

the current ACL functionality in Kafka is a bit limited concerning
host based rules when specifying multiple hosts. A common scenario for
this would be that if have a YARN cluster running Spark jobs that
access Kafka and want to create ACLs based on the ip addresses of the
cluster nodes.
Currently kafka-acls only allows to specify individual ips, so this
would look like

./kafka-acls --add --producer \
--topic test --authorizer-properties zookeeper.connect=localhost:2181 \
--allow-principal User:spark \
--allow-host 10.0.0.10 \
--allow-host 10.0.0.11 \
--allow-host ...

which can get unwieldy if you have a 200 node cluster. Internally this
command would not create a single ACL with multiple host entries, but
rather one ACL per host that is specified on the command line, which
makes the ACL listing a bit confusing.

There are currently a few jiras in various states around this topic:
KAFKA-3531 [1], KAFKA-4759 [2], KAFKA-4985 [3] & KAFKA-5713 [4]

KAFKA-4759 has a patch available, but would currently only add
interpretation of CIDR notation, no specific ranges, which I think
could easily be added.

Colin McCabe commented in KAFKA-4985 that so far this was not
implemented as no standard for expressing ip ranges with a fast
implementation had been found so far, the available patch uses the
ipmath [5] package for parsing expressions and range checking - which
seems fairly small and focused.

This would allow for expressions of the following type:
10.0.0.1
10.0.0.1-10.0.0.10
10.0.0.0/24

I'd suggest extending this a little to allow a semicolon separated
list of values:
10.0.0.1;10.0.0.1-10.0.0.10;10.0.0.0/24

Performance considerations
Internally the ipmath package represents ip addresses as longs, so if
we stick with the example of a 200 node cluster from above, with the
current implementation that would be 200 string comparisons for every
request, whereas with a range it could potentially come down to two
long comparisons. This is of course a back-of-the-envelope calculation
at best, but there at least seems to be a case for investigating this
a bit further I think.


These changes would probably necessitate a KIP - though with some
consideration they could be made in a way that no existing public
facing functionality is changed, but for transparency and proper
documentation I'd say a KIP would be preferable.

I'd be happy to draft one if people think this is worthwhile.

Let me know what you think.

best regards,
Sönke

[1] https://issues.apache.org/jira/browse/KAFKA-3531
[2] https://issues.apache.org/jira/browse/KAFKA-4759
[3] https://issues.apache.org/jira/browse/KAFKA-4985
[4] https://issues.apache.org/jira/browse/KAFKA-5713
[5] https://github.com/jgonian/commons-ip-math


[jira] [Created] (KAFKA-6480) Add config to enforce max fetch size on the broker

2018-01-24 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6480:
--

 Summary: Add config to enforce max fetch size on the broker
 Key: KAFKA-6480
 URL: https://issues.apache.org/jira/browse/KAFKA-6480
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


Users are increasingly hitting memory problems due to message format 
down-conversion. The problem is basically that we have to do the 
down-conversion in memory. Since the default fetch size is 50Mb, it doesn't 
take that many fetch requests to cause an OOM. One mitigation is KAFKA-6352. It 
would also be helpful if the broker had a configuration to restrict the maximum 
allowed fetch size across all consumers. This would also prevent a malicious 
client from using this in order to DoS the server.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-232: Detect outdated metadata using leaderEpoch and partitionEpoch

2018-01-24 Thread Colin McCabe
Hi Dong,

Thanks for proposing this KIP.  I think a metadata epoch is a really good idea.

I read through the DISCUSS thread, but I still don't have a clear picture of 
why the proposal uses a metadata epoch per partition rather than a global 
metadata epoch.  A metadata epoch per partition is kind of unpleasant-- it's at 
least 4 extra bytes per partition that we have to send over the wire in every 
full metadata request, which could become extra kilobytes on the wire when the 
number of partitions becomes large.  Plus, we have to update all the auxillary 
classes to include an epoch.

We need to have a global metadata epoch anyway to handle partition addition and 
deletion.  For example, if I give you MetadataResponse{part1,epoch 1, part2, 
epoch 1} and {part1, epoch1}, which MetadataResponse is newer?  You have no way 
of knowing.  It could be that part2 has just been created, and the response 
with 2 partitions is newer.  Or it coudl be that part2 has just been deleted, 
and therefore the response with 1 partition is newer.  You must have a global 
epoch to disambiguate these two cases.

Previously, I worked on the Ceph distributed filesystem.  Ceph had the concept 
of a map of the whole cluster, maintained by a few servers doing paxos.  This 
map was versioned by a single 64-bit epoch number which increased on every 
change.  It was propagated to clients through gossip.  I wonder if something 
similar could work here?

It seems like the the Kafka MetadataResponse serves two somewhat unrelated 
purposes.  Firstly, it lets clients know what partitions exist in the system 
and where they live.  Secondly, it lets clients know which nodes within the 
partition are in-sync (in the ISR) and which node is the leader.

The first purpose is what you really need a metadata epoch for, I think.  You 
want to know whether a partition exists or not, or you want to know which nodes 
you should talk to in order to write to a given partition.  A single metadata 
epoch for the whole response should be adequate here.  We should not change the 
partition assignment without going through zookeeper (or a similar system), and 
this inherently serializes updates into a numbered stream.  Brokers should also 
stop responding to requests when they are unable to contact ZK for a certain 
time period.  This prevents the case where a given partition has been moved off 
some set of nodes, but a client still ends up talking to those nodes and 
writing data there.

For the second purpose, this is "soft state" anyway.  If the client thinks X is 
the leader but Y is really the leader, the client will talk to X, and X will 
point out its mistake by sending back a NOT_LEADER_FOR_PARTITION.  Then the 
client can update its metadata again and find the new leader, if there is one.  
There is no need for an epoch to handle this.  Similarly, I can't think of a 
reason why changing the in-sync replica set needs to bump the epoch.

best,
Colin


On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote:
> Thanks much for reviewing the KIP!
> 
> Dong
> 
> On Wed, Jan 24, 2018 at 7:10 AM, Guozhang Wang  wrote:
> 
> > Yeah that makes sense, again I'm just making sure we understand all the
> > scenarios and what to expect.
> >
> > I agree that if, more generally speaking, say users have only consumed to
> > offset 8, and then call seek(16) to "jump" to a further position, then she
> > needs to be aware that OORE maybe thrown and she needs to handle it or rely
> > on reset policy which should not surprise her.
> >
> >
> > I'm +1 on the KIP.
> >
> > Guozhang
> >
> >
> > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin  wrote:
> >
> > > Yes, in general we can not prevent OffsetOutOfRangeException if user
> > seeks
> > > to a wrong offset. The main goal is to prevent OffsetOutOfRangeException
> > if
> > > user has done things in the right way, e.g. user should know that there
> > is
> > > message with this offset.
> > >
> > > For example, if user calls seek(..) right after construction, the only
> > > reason I can think of is that user stores offset externally. In this
> > case,
> > > user currently needs to use the offset which is obtained using
> > position(..)
> > > from the last run. With this KIP, user needs to get the offset and the
> > > offsetEpoch using positionAndOffsetEpoch(...) and stores these
> > information
> > > externally. The next time user starts consumer, he/she needs to call
> > > seek(..., offset, offsetEpoch) right after construction. Then KIP should
> > be
> > > able to ensure that we don't throw OffsetOutOfRangeException if there is
> > no
> > > unclean leader election.
> > >
> > > Does this sound OK?
> > >
> > > Regards,
> > > Dong
> > >
> > >
> > > On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang 
> > > wrote:
> > >
> > > > "If consumer wants to consume message with offset 16, then consumer
> > must
> > > > have
> > > > already fetched message with offset 15"
> > > >
> > > > --> this 

[jira] [Created] (KAFKA-6479) Broker file descriptor leak after consumer request timeout

2018-01-24 Thread Ryan Leslie (JIRA)
Ryan Leslie created KAFKA-6479:
--

 Summary: Broker file descriptor leak after consumer request timeout
 Key: KAFKA-6479
 URL: https://issues.apache.org/jira/browse/KAFKA-6479
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 1.0.0
Reporter: Ryan Leslie


When a consumer request times out, i.e. takes longer than request.timeout.ms, 
and the client disconnects from the coordinator, the coordinator may leak file 
descriptors. The following code produces this behavior:


{code:java}
Properties config = new Properties();
config.put("bootstrap.servers", BROKERS);
config.put("group.id", "leak-test");
config.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
config.put("max.poll.interval.ms", Integer.MAX_VALUE);
config.put("request.timeout.ms", 12000);

KafkaConsumer consumer1 = new KafkaConsumer<>(config);
KafkaConsumer consumer2 = new KafkaConsumer<>(config);

List topics = Collections.singletonList("leak-test");
consumer1.subscribe(topics);
consumer2.subscribe(topics);

consumer1.poll(100); 
consumer2.poll(100);
{code}

When the above executes, consumer 2 will attempt to rebalance indefinitely 
(blocked by the inactive consumer 1), logging a _Marking the coordinator dead_ 
message every 12 seconds after giving up on the JOIN_GROUP request and 
disconnecting. Unless the consumer exits or times out, this will cause a socket 
in CLOSE_WAIT to leak in the coordinator and the broker will eventually run out 
of file descriptors and crash.

Aside from faulty code as in the example above, or an intentional DoS, any 
client bug causing a consumer to block, e.g. KAFKA-6397, could also result in 
this leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2018-01-24 Thread Rajini Sivaram
Hi Ismael,

Yes, that makes sense. Looking at the command line options for different
tools, we seem to be using *--command-config  *in the commands
that currently talk to the new AdminClient (DelegationTokenCommand,
ConsumerGroupCommand, DeleteRecordsCommand). So perhaps it makes sense to
do the same for ConfigCommand as well. I will update KIP-226 with the two
options *--bootstrap-server* and *--command-config*.

Viktor, what do you think?

At the moment, I think many in the community are busy due to the code
freeze next week, but hopefully we should get more feedback on KIP-248 soon
after.

Thank you,

Rajini

On Wed, Jan 24, 2018 at 5:41 AM, Viktor Somogyi 
wrote:

> Hi all,
>
> I'd also like to as the community here who were participating the
> discussion of KIP-226 to take a look at KIP-248 (that is making
> kafka-configs.sh fully function with AdminClient and a Java based
> ConfigCommand). It would be much appreciated to get feedback on that as it
> plays an important role for KIP-226 and other long-waited features.
>
> Thanks,
> Viktor
>
> On Wed, Jan 24, 2018 at 6:56 AM, Ismael Juma  wrote:
>
> > Hi Rajini,
> >
> > I think the proposal makes sense. One suggestion: can we just allow the
> > config to be passed? That is, leave out the properties config for now.
> >
> > On Tue, Jan 23, 2018 at 3:01 PM, Rajini Sivaram  >
> > wrote:
> >
> > > Since we are running out of time to get the whole ConfigCommand
> converted
> > > to using the new AdminClient for 1.1.0 (KIP-248), we need a way to
> enable
> > > ConfigCommand to handle broker config updates (implemented by KIP-226).
> > As
> > > a simple first step, it would make sense to use the existing
> > ConfigCommand
> > > tool to perform broker config updates enabled by this KIP. Since config
> > > validation and password encryption are performed by the broker, this
> will
> > > be easier to do with the new AdminClient. To do this, we need to add
> > > command line options for new admin client to kafka-configs.sh. Dynamic
> > > broker config updates alone will be done under KIP-226 using the new
> > admin
> > > client to make this feature usable.. The new command line options
> > > (consistent with KIP-248) that will be added to ConfigCommand will be:
> > >
> > >- --bootstrap-server *host:port*
> > >- --adminclient.config *config-file*
> > >- --adminclient.properties *k1=v1,k2=v2*
> > >
> > > If anyone has any concerns about these options being added to
> > > kafka-configs.sh, please let me know. Otherwise, I will update KIP-226
> > and
> > > add the options to one of the KIP-226 PRs.
> > >
> > > Thank you,
> > >
> > > Rajini
> > >
> > > On Wed, Jan 10, 2018 at 5:14 AM, Ismael Juma 
> wrote:
> > >
> > > > Thanks Rajini. Sounds good.
> > > >
> > > > Ismael
> > > >
> > > > On Wed, Jan 10, 2018 at 11:41 AM, Rajini Sivaram <
> > > rajinisiva...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Ismael,
> > > > >
> > > > > I have updated the KIP to use AES-256 if available and AES-128
> > > otherwise
> > > > > for password encryption. Looking at GCM, it looks like GCM is
> > typically
> > > > > used with a variable initialization vector, while we are using a
> > > random,
> > > > > but constant IV per-password. Also, AES/GCM is not supported by
> > Java7.
> > > > > Since the authentication and performance benefits of GCM are not
> > > required
> > > > > for this scenario, I am thinking I will leave the default as CBC,
> but
> > > > make
> > > > > sure we test GCM as well so that users have the choice.
> > > > >
> > > > > On Wed, Jan 10, 2018 at 1:01 AM, Colin McCabe 
> > > > wrote:
> > > > >
> > > > > > Thanks, Rajini.  That makes sense.
> > > > > >
> > > > > > regards,
> > > > > > Colin
> > > > > >
> > > > > > On Tue, Jan 9, 2018, at 14:38, Rajini Sivaram wrote:
> > > > > > > Hi Colin,
> > > > > > >
> > > > > > > Thank you for reviewing.
> > > > > > >
> > > > > > > Yes, validation is done on the broker, not the client.
> > > > > > >
> > > > > > > All configs from ZooKeeper are processed and any config that
> > could
> > > > not
> > > > > be
> > > > > > > applied are logged as warnings. This includes any configs that
> > are
> > > > not
> > > > > > > dynamic in the broker version or any configs that are not
> > supported
> > > > in
> > > > > > the
> > > > > > > broker version. If you downgrade to a version that is older
> than
> > > this
> > > > > KIP
> > > > > > > (1.0 for example), then you don't get any warnings however.
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Jan 9, 2018 at 9:38 PM, Colin McCabe <
> cmcc...@apache.org
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > On Mon, Dec 18, 2017, at 13:40, Jason Gustafson wrote:
> > > > > > > > > Hi Rajini,
> > > > > > > > >
> > > > > > > > > Looking good. Just a few questions.
> > > > > > > > >
> > > > > > > > > 1. (Related to Jay's comment) Is the validate() method on
> > > > > > 

Re: [VOTE] KIP-232: Detect outdated metadata using leaderEpoch and partitionEpoch

2018-01-24 Thread Dong Lin
Thanks much for reviewing the KIP!

Dong

On Wed, Jan 24, 2018 at 7:10 AM, Guozhang Wang  wrote:

> Yeah that makes sense, again I'm just making sure we understand all the
> scenarios and what to expect.
>
> I agree that if, more generally speaking, say users have only consumed to
> offset 8, and then call seek(16) to "jump" to a further position, then she
> needs to be aware that OORE maybe thrown and she needs to handle it or rely
> on reset policy which should not surprise her.
>
>
> I'm +1 on the KIP.
>
> Guozhang
>
>
> On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin  wrote:
>
> > Yes, in general we can not prevent OffsetOutOfRangeException if user
> seeks
> > to a wrong offset. The main goal is to prevent OffsetOutOfRangeException
> if
> > user has done things in the right way, e.g. user should know that there
> is
> > message with this offset.
> >
> > For example, if user calls seek(..) right after construction, the only
> > reason I can think of is that user stores offset externally. In this
> case,
> > user currently needs to use the offset which is obtained using
> position(..)
> > from the last run. With this KIP, user needs to get the offset and the
> > offsetEpoch using positionAndOffsetEpoch(...) and stores these
> information
> > externally. The next time user starts consumer, he/she needs to call
> > seek(..., offset, offsetEpoch) right after construction. Then KIP should
> be
> > able to ensure that we don't throw OffsetOutOfRangeException if there is
> no
> > unclean leader election.
> >
> > Does this sound OK?
> >
> > Regards,
> > Dong
> >
> >
> > On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang 
> > wrote:
> >
> > > "If consumer wants to consume message with offset 16, then consumer
> must
> > > have
> > > already fetched message with offset 15"
> > >
> > > --> this may not be always true right? What if consumer just call
> > seek(16)
> > > after construction and then poll without committed offset ever stored
> > > before? Admittedly it is rare but we do not programmably disallow it.
> > >
> > >
> > > Guozhang
> > >
> > > On Tue, Jan 23, 2018 at 10:42 PM, Dong Lin 
> wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > Thanks much for reviewing the KIP!
> > > >
> > > > In the scenario you described, let's assume that broker A has
> messages
> > > with
> > > > offset up to 10, and broker B has messages with offset up to 20. If
> > > > consumer wants to consume message with offset 9, it will not receive
> > > > OffsetOutOfRangeException
> > > > from broker A.
> > > >
> > > > If consumer wants to consume message with offset 16, then consumer
> must
> > > > have already fetched message with offset 15, which can only come from
> > > > broker B. Because consumer will fetch from broker B only if
> leaderEpoch
> > > >=
> > > > 2, then the current consumer leaderEpoch can not be 1 since this KIP
> > > > prevents leaderEpoch rewind. Thus we will not have
> > > > OffsetOutOfRangeException
> > > > in this case.
> > > >
> > > > Does this address your question, or maybe there is more advanced
> > scenario
> > > > that the KIP does not handle?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Tue, Jan 23, 2018 at 9:43 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Thanks Dong, I made a pass over the wiki and it lgtm.
> > > > >
> > > > > Just a quick question: can we completely eliminate the
> > > > > OffsetOutOfRangeException with this approach? Say if there is
> > > consecutive
> > > > > leader changes such that the cached metadata's partition epoch is
> 1,
> > > and
> > > > > the metadata fetch response returns  with partition epoch 2
> pointing
> > to
> > > > > leader broker A, while the actual up-to-date metadata has partition
> > > > epoch 3
> > > > > whose leader is now broker B, the metadata refresh will still
> succeed
> > > and
> > > > > the follow-up fetch request may still see OORE?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Jan 23, 2018 at 3:47 PM, Dong Lin 
> > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I would like to start the voting process for KIP-232:
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 232%3A+Detect+outdated+metadata+using+leaderEpoch+
> > and+partitionEpoch
> > > > > >
> > > > > > The KIP will help fix a concurrency issue in Kafka which
> currently
> > > can
> > > > > > cause message loss or message duplication in consumer.
> > > > > >
> > > > > > Regards,
> > > > > > Dong
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-86: Configurable SASL callback handlers

2018-01-24 Thread Rajini Sivaram
Thanks everyone for the feedback and votes.

The vote has passed with 3 binding votes (Jun, Ismael, me) and 6
non-binding votes (Edo, Mickael, Tom, Ted, Tao, Manikumar).

I will update the KIP page.

Regards,

Rajini

On Wed, Jan 24, 2018 at 7:26 AM, Ismael Juma  wrote:

> Thanks for the KIP, Rajini. This is a useful improvement, so +1 (binding)
> from me.
>
> I really don't like how the Java Security classes work, so I would have
> preferred to avoid emulating them, but the KIP is consistent with previous
> related KIPs and that's the direction we chose previously. Also, I think I
> might have tried to reduce the number of configs from 2 to 1 (in broker and
> client) by relying more on Java, but I don't have a concrete proposal and
> it would result in a larger API surface area.
>
> Ismael
>
> On Thu, Apr 6, 2017 at 2:53 AM, Rajini Sivaram 
> wrote:
>
> > Hi all,
> >
> > I would like to start the voting process for KIP-86:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 86%3A+Configurable+SASL+callback+handlers
> >
> > The KIP makes callback handlers for SASL configurable to make it simpler
> to
> > integrate with custom authentication database or custom authentication
> > servers. This is particularly useful for SASL/PLAIN where the
> > implementation in Kafka based on credentials stored in jaas.conf is not
> > suitable for production use. It is also useful for SCRAM in environments
> > where ZooKeeper is not secure.
> >
> > Thank you...
> >
> > Regards,
> >
> > Rajini
> >
>


Re: [VOTE] KIP-86: Configurable SASL callback handlers

2018-01-24 Thread Ismael Juma
Thanks for the KIP, Rajini. This is a useful improvement, so +1 (binding)
from me.

I really don't like how the Java Security classes work, so I would have
preferred to avoid emulating them, but the KIP is consistent with previous
related KIPs and that's the direction we chose previously. Also, I think I
might have tried to reduce the number of configs from 2 to 1 (in broker and
client) by relying more on Java, but I don't have a concrete proposal and
it would result in a larger API surface area.

Ismael

On Thu, Apr 6, 2017 at 2:53 AM, Rajini Sivaram 
wrote:

> Hi all,
>
> I would like to start the voting process for KIP-86:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 86%3A+Configurable+SASL+callback+handlers
>
> The KIP makes callback handlers for SASL configurable to make it simpler to
> integrate with custom authentication database or custom authentication
> servers. This is particularly useful for SASL/PLAIN where the
> implementation in Kafka based on credentials stored in jaas.conf is not
> suitable for production use. It is also useful for SCRAM in environments
> where ZooKeeper is not secure.
>
> Thank you...
>
> Regards,
>
> Rajini
>


Re: [VOTE] KIP-232: Detect outdated metadata using leaderEpoch and partitionEpoch

2018-01-24 Thread Guozhang Wang
Yeah that makes sense, again I'm just making sure we understand all the
scenarios and what to expect.

I agree that if, more generally speaking, say users have only consumed to
offset 8, and then call seek(16) to "jump" to a further position, then she
needs to be aware that OORE maybe thrown and she needs to handle it or rely
on reset policy which should not surprise her.


I'm +1 on the KIP.

Guozhang


On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin  wrote:

> Yes, in general we can not prevent OffsetOutOfRangeException if user seeks
> to a wrong offset. The main goal is to prevent OffsetOutOfRangeException if
> user has done things in the right way, e.g. user should know that there is
> message with this offset.
>
> For example, if user calls seek(..) right after construction, the only
> reason I can think of is that user stores offset externally. In this case,
> user currently needs to use the offset which is obtained using position(..)
> from the last run. With this KIP, user needs to get the offset and the
> offsetEpoch using positionAndOffsetEpoch(...) and stores these information
> externally. The next time user starts consumer, he/she needs to call
> seek(..., offset, offsetEpoch) right after construction. Then KIP should be
> able to ensure that we don't throw OffsetOutOfRangeException if there is no
> unclean leader election.
>
> Does this sound OK?
>
> Regards,
> Dong
>
>
> On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang 
> wrote:
>
> > "If consumer wants to consume message with offset 16, then consumer must
> > have
> > already fetched message with offset 15"
> >
> > --> this may not be always true right? What if consumer just call
> seek(16)
> > after construction and then poll without committed offset ever stored
> > before? Admittedly it is rare but we do not programmably disallow it.
> >
> >
> > Guozhang
> >
> > On Tue, Jan 23, 2018 at 10:42 PM, Dong Lin  wrote:
> >
> > > Hey Guozhang,
> > >
> > > Thanks much for reviewing the KIP!
> > >
> > > In the scenario you described, let's assume that broker A has messages
> > with
> > > offset up to 10, and broker B has messages with offset up to 20. If
> > > consumer wants to consume message with offset 9, it will not receive
> > > OffsetOutOfRangeException
> > > from broker A.
> > >
> > > If consumer wants to consume message with offset 16, then consumer must
> > > have already fetched message with offset 15, which can only come from
> > > broker B. Because consumer will fetch from broker B only if leaderEpoch
> > >=
> > > 2, then the current consumer leaderEpoch can not be 1 since this KIP
> > > prevents leaderEpoch rewind. Thus we will not have
> > > OffsetOutOfRangeException
> > > in this case.
> > >
> > > Does this address your question, or maybe there is more advanced
> scenario
> > > that the KIP does not handle?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Tue, Jan 23, 2018 at 9:43 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Thanks Dong, I made a pass over the wiki and it lgtm.
> > > >
> > > > Just a quick question: can we completely eliminate the
> > > > OffsetOutOfRangeException with this approach? Say if there is
> > consecutive
> > > > leader changes such that the cached metadata's partition epoch is 1,
> > and
> > > > the metadata fetch response returns  with partition epoch 2 pointing
> to
> > > > leader broker A, while the actual up-to-date metadata has partition
> > > epoch 3
> > > > whose leader is now broker B, the metadata refresh will still succeed
> > and
> > > > the follow-up fetch request may still see OORE?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Jan 23, 2018 at 3:47 PM, Dong Lin 
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I would like to start the voting process for KIP-232:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 232%3A+Detect+outdated+metadata+using+leaderEpoch+
> and+partitionEpoch
> > > > >
> > > > > The KIP will help fix a concurrency issue in Kafka which currently
> > can
> > > > > cause message loss or message duplication in consumer.
> > > > >
> > > > > Regards,
> > > > > Dong
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2018-01-24 Thread Viktor Somogyi
Hi all,

I'd also like to as the community here who were participating the
discussion of KIP-226 to take a look at KIP-248 (that is making
kafka-configs.sh fully function with AdminClient and a Java based
ConfigCommand). It would be much appreciated to get feedback on that as it
plays an important role for KIP-226 and other long-waited features.

Thanks,
Viktor

On Wed, Jan 24, 2018 at 6:56 AM, Ismael Juma  wrote:

> Hi Rajini,
>
> I think the proposal makes sense. One suggestion: can we just allow the
> config to be passed? That is, leave out the properties config for now.
>
> On Tue, Jan 23, 2018 at 3:01 PM, Rajini Sivaram 
> wrote:
>
> > Since we are running out of time to get the whole ConfigCommand converted
> > to using the new AdminClient for 1.1.0 (KIP-248), we need a way to enable
> > ConfigCommand to handle broker config updates (implemented by KIP-226).
> As
> > a simple first step, it would make sense to use the existing
> ConfigCommand
> > tool to perform broker config updates enabled by this KIP. Since config
> > validation and password encryption are performed by the broker, this will
> > be easier to do with the new AdminClient. To do this, we need to add
> > command line options for new admin client to kafka-configs.sh. Dynamic
> > broker config updates alone will be done under KIP-226 using the new
> admin
> > client to make this feature usable.. The new command line options
> > (consistent with KIP-248) that will be added to ConfigCommand will be:
> >
> >- --bootstrap-server *host:port*
> >- --adminclient.config *config-file*
> >- --adminclient.properties *k1=v1,k2=v2*
> >
> > If anyone has any concerns about these options being added to
> > kafka-configs.sh, please let me know. Otherwise, I will update KIP-226
> and
> > add the options to one of the KIP-226 PRs.
> >
> > Thank you,
> >
> > Rajini
> >
> > On Wed, Jan 10, 2018 at 5:14 AM, Ismael Juma  wrote:
> >
> > > Thanks Rajini. Sounds good.
> > >
> > > Ismael
> > >
> > > On Wed, Jan 10, 2018 at 11:41 AM, Rajini Sivaram <
> > rajinisiva...@gmail.com>
> > > wrote:
> > >
> > > > Hi Ismael,
> > > >
> > > > I have updated the KIP to use AES-256 if available and AES-128
> > otherwise
> > > > for password encryption. Looking at GCM, it looks like GCM is
> typically
> > > > used with a variable initialization vector, while we are using a
> > random,
> > > > but constant IV per-password. Also, AES/GCM is not supported by
> Java7.
> > > > Since the authentication and performance benefits of GCM are not
> > required
> > > > for this scenario, I am thinking I will leave the default as CBC, but
> > > make
> > > > sure we test GCM as well so that users have the choice.
> > > >
> > > > On Wed, Jan 10, 2018 at 1:01 AM, Colin McCabe 
> > > wrote:
> > > >
> > > > > Thanks, Rajini.  That makes sense.
> > > > >
> > > > > regards,
> > > > > Colin
> > > > >
> > > > > On Tue, Jan 9, 2018, at 14:38, Rajini Sivaram wrote:
> > > > > > Hi Colin,
> > > > > >
> > > > > > Thank you for reviewing.
> > > > > >
> > > > > > Yes, validation is done on the broker, not the client.
> > > > > >
> > > > > > All configs from ZooKeeper are processed and any config that
> could
> > > not
> > > > be
> > > > > > applied are logged as warnings. This includes any configs that
> are
> > > not
> > > > > > dynamic in the broker version or any configs that are not
> supported
> > > in
> > > > > the
> > > > > > broker version. If you downgrade to a version that is older than
> > this
> > > > KIP
> > > > > > (1.0 for example), then you don't get any warnings however.
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 9, 2018 at 9:38 PM, Colin McCabe  >
> > > > wrote:
> > > > > >
> > > > > > > On Mon, Dec 18, 2017, at 13:40, Jason Gustafson wrote:
> > > > > > > > Hi Rajini,
> > > > > > > >
> > > > > > > > Looking good. Just a few questions.
> > > > > > > >
> > > > > > > > 1. (Related to Jay's comment) Is the validate() method on
> > > > > Reconfigurable
> > > > > > > > necessary? I would have thought we'd validate using the
> > > ConfigDef.
> > > > > Do you
> > > > > > > > have a use case in mind in which the reconfigurable component
> > > only
> > > > > > > permits
> > > > > > > > certain reconfigurations?
> > > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > Sorry if this is a dumb question, but when we talk about
> > validating
> > > > on
> > > > > the
> > > > > > > ConfigDef, we're talking about validating on the server side,
> > > right?
> > > > > The
> > > > > > > software on the client side might be older or newer than the
> > > software
> > > > > on
> > > > > > > the broker side, so it seems inadvisable to do the validation
> > > there.
> > > > > > >
> > > > > > > Also, after a software downgrade, when the broker is restarted,
> > it
> > > > > might
> > > > > > > find that there is a configuration key that is stored in ZK
> that
> > is
> > > > not
> > > > > > > 

Re: offsetsForTimes API performance

2018-01-24 Thread srimugunthan dhandapani
Does the performance of kafka APIs (
https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
) depend on how geographically apart the caller of the API is from the
kafka cluster?
Do all APIs perform faster if the calls are  made from a machine co-located
in the kafka cluster?


On Mon, Jan 22, 2018 at 8:54 PM, Andrew Otto  wrote:

> Speaking of, has there been any talk of combining those two requests into a
> single API call?  I’d assume that offsetForTimes + consumer seek is
> probably the most common use case of offsetForTimes.  Maybe a round trip
> could be avoided if the broker could just auto-assign the consumer to the
> offset for a timestamp.
>
>
> On Mon, Jan 22, 2018 at 9:59 AM, srimugunthan dhandapani <
> srimugunthan.dhandap...@gmail.com> wrote:
>
> >  Hi all,
> >
> > We use kafka as our store and  every one of our record is associated
> with a
> > timeStamp. We pull data from kafka by seeking to a timeStamp offset
> > everytime and then get the records by polling. We use KafkaConsumer's
> > offsetsForTimes (
> > https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/
> > KafkaConsumer.html#offsetsForTimes(java.util.Map)
> >
> > ) API to  find offset and seek to a particular time offset.
> >
> >  We see that using the offsetsForTimes API and the subsequent seek to the
> > offset takes anything from 17 milliseconds to 500millisec per iteration.
> >
> > I would like to know if anybody has done any performance testing of the
> > offsetsForTimes API and what does the performance of the API depend on?
> > Will the API be slower if there is more data in the kafka?
> >
> >
> > thanks,
> > mugunthan
> >
>


Re: [VOTE] KIP-86: Configurable SASL callback handlers

2018-01-24 Thread Ted Yu
+1
 Original message From: Manikumar  
Date: 1/24/18  3:07 AM  (GMT-08:00) To: dev@kafka.apache.org Subject: Re: 
[VOTE] KIP-86: Configurable SASL callback handlers 
Hi,

+1 (non-binding)

Thanks for the KIP.

On Wed, Jan 24, 2018 at 5:00 AM, Jun Rao  wrote:

> Hi, Rajini,
>
> Thanks for the KIP. +1 from me.
>
> Jun
>
> On Thu, Jan 18, 2018 at 8:58 AM, tao xiao  wrote:
>
> >  +1 (non-binding)
> >
> > On Fri, 19 Jan 2018 at 00:47 Rajini Sivaram 
> > wrote:
> >
> > > Hi all,
> > >
> > > I would like to restart the vote for KIP-86:
> > >    https://cwiki.apache.org/confluence/display/KAFKA/KIP-86
> > > %3A+Configurable+SASL+callback+handlers
> > >
> > > The KIP makes callback handlers for SASL configurable to make it
> simpler
> > to
> > > integrate with custom authentication database or custom authentication
> > > servers. This is particularly useful for SASL/PLAIN where the
> > > implementation in Kafka based on credentials stored in jaas.conf is not
> > > suitable for production use. It is also useful for SCRAM in
> environments
> > > where ZooKeeper is not secure. The KIP has also been updated to
> simplify
> > > addition of new SASL mechanisms by making the Login class configurable.
> > >
> > > The PR for the KIP has been rebased and updated (
> > > https://github.com/apache/kafka/pull/2022)
> > >
> > > Thank you,
> > >
> > > Rajini
> > >
> > >
> > >
> > > On Mon, Dec 11, 2017 at 2:22 PM, Ted Yu  wrote:
> > >
> > > > +1
> > > >  Original message From: Tom Bentley <
> > > t.j.bent...@gmail.com>
> > > > Date: 12/11/17  6:06 AM  (GMT-08:00) To: dev@kafka.apache.org
> Subject:
> > > > Re: [VOTE] KIP-86: Configurable SASL callback handlers
> > > > +1 (non-binding)
> > > >
> > > > On 5 May 2017 at 11:57, Mickael Maison 
> > wrote:
> > > >
> > > > > Thanks for the KIP Rajini, this will significantly simplify
> providing
> > > > > custom credential providers
> > > > > +1 (non binding)
> > > > >
> > > > > On Wed, May 3, 2017 at 8:25 AM, Rajini Sivaram <
> > > rajinisiva...@gmail.com>
> > > > > wrote:
> > > > > > Can we have some more reviews or votes for this KIP to include in
> > > > > 0.11.0.0?
> > > > > > It is not a breaking change and the code is ready for
> integration,
> > so
> > > > it
> > > > > > will be good to get it in if possible.
> > > > > >
> > > > > > Ismael/Jun, since you had reviewed the KIP earlier, can you let
> me
> > > know
> > > > > if
> > > > > > I can do anything more to get your votes?
> > > > > >
> > > > > >
> > > > > > Thank you,
> > > > > >
> > > > > > Rajini
> > > > > >
> > > > > >
> > > > > > On Mon, Apr 10, 2017 at 12:18 PM, Edoardo Comar <
> eco...@uk.ibm.com
> > >
> > > > > wrote:
> > > > > >
> > > > > >> +1 (non binding)
> > > > > >> many thanks Rajini !
> > > > > >>
> > > > > >> --
> > > > > >> Edoardo Comar
> > > > > >> IBM MessageHub
> > > > > >> eco...@uk.ibm.com
> > > > > >> IBM UK Ltd, Hursley Park, SO21 2JN
> > > > > >>
> > > > > >> IBM United Kingdom Limited Registered in England and Wales with
> > > number
> > > > > >> 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
> > > Hants.
> > > > > PO6
> > > > > >> 3AU
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> From:   Rajini Sivaram 
> > > > > >> To: dev@kafka.apache.org
> > > > > >> Date:   06/04/2017 10:53
> > > > > >> Subject:    [VOTE] KIP-86: Configurable SASL callback
> handlers
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> Hi all,
> > > > > >>
> > > > > >> I would like to start the voting process for KIP-86:
> > > > > >>
> > > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > >> 86%3A+Configurable+SASL+callback+handlers
> > > > > >>
> > > > > >>
> > > > > >> The KIP makes callback handlers for SASL configurable to make it
> > > > simpler
> > > > > >> to
> > > > > >> integrate with custom authentication database or custom
> > > authentication
> > > > > >> servers. This is particularly useful for SASL/PLAIN where the
> > > > > >> implementation in Kafka based on credentials stored in jaas.conf
> > is
> > > > not
> > > > > >> suitable for production use. It is also useful for SCRAM in
> > > > environments
> > > > > >> where ZooKeeper is not secure.
> > > > > >>
> > > > > >> Thank you...
> > > > > >>
> > > > > >> Regards,
> > > > > >>
> > > > > >> Rajini
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> Unless stated otherwise above:
> > > > > >> IBM United Kingdom Limited - Registered in England and Wales
> with
> > > > number
> > > > > >> 741598.
> > > > > >> Registered office: PO Box 41, North Harbour, Portsmouth,
> Hampshire
> > > PO6
> > > > > 3AU
> > > > > >>
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-86: Configurable SASL callback handlers

2018-01-24 Thread Manikumar
Hi,

+1 (non-binding)

Thanks for the KIP.

On Wed, Jan 24, 2018 at 5:00 AM, Jun Rao  wrote:

> Hi, Rajini,
>
> Thanks for the KIP. +1 from me.
>
> Jun
>
> On Thu, Jan 18, 2018 at 8:58 AM, tao xiao  wrote:
>
> >  +1 (non-binding)
> >
> > On Fri, 19 Jan 2018 at 00:47 Rajini Sivaram 
> > wrote:
> >
> > > Hi all,
> > >
> > > I would like to restart the vote for KIP-86:
> > >https://cwiki.apache.org/confluence/display/KAFKA/KIP-86
> > > %3A+Configurable+SASL+callback+handlers
> > >
> > > The KIP makes callback handlers for SASL configurable to make it
> simpler
> > to
> > > integrate with custom authentication database or custom authentication
> > > servers. This is particularly useful for SASL/PLAIN where the
> > > implementation in Kafka based on credentials stored in jaas.conf is not
> > > suitable for production use. It is also useful for SCRAM in
> environments
> > > where ZooKeeper is not secure. The KIP has also been updated to
> simplify
> > > addition of new SASL mechanisms by making the Login class configurable.
> > >
> > > The PR for the KIP has been rebased and updated (
> > > https://github.com/apache/kafka/pull/2022)
> > >
> > > Thank you,
> > >
> > > Rajini
> > >
> > >
> > >
> > > On Mon, Dec 11, 2017 at 2:22 PM, Ted Yu  wrote:
> > >
> > > > +1
> > > >  Original message From: Tom Bentley <
> > > t.j.bent...@gmail.com>
> > > > Date: 12/11/17  6:06 AM  (GMT-08:00) To: dev@kafka.apache.org
> Subject:
> > > > Re: [VOTE] KIP-86: Configurable SASL callback handlers
> > > > +1 (non-binding)
> > > >
> > > > On 5 May 2017 at 11:57, Mickael Maison 
> > wrote:
> > > >
> > > > > Thanks for the KIP Rajini, this will significantly simplify
> providing
> > > > > custom credential providers
> > > > > +1 (non binding)
> > > > >
> > > > > On Wed, May 3, 2017 at 8:25 AM, Rajini Sivaram <
> > > rajinisiva...@gmail.com>
> > > > > wrote:
> > > > > > Can we have some more reviews or votes for this KIP to include in
> > > > > 0.11.0.0?
> > > > > > It is not a breaking change and the code is ready for
> integration,
> > so
> > > > it
> > > > > > will be good to get it in if possible.
> > > > > >
> > > > > > Ismael/Jun, since you had reviewed the KIP earlier, can you let
> me
> > > know
> > > > > if
> > > > > > I can do anything more to get your votes?
> > > > > >
> > > > > >
> > > > > > Thank you,
> > > > > >
> > > > > > Rajini
> > > > > >
> > > > > >
> > > > > > On Mon, Apr 10, 2017 at 12:18 PM, Edoardo Comar <
> eco...@uk.ibm.com
> > >
> > > > > wrote:
> > > > > >
> > > > > >> +1 (non binding)
> > > > > >> many thanks Rajini !
> > > > > >>
> > > > > >> --
> > > > > >> Edoardo Comar
> > > > > >> IBM MessageHub
> > > > > >> eco...@uk.ibm.com
> > > > > >> IBM UK Ltd, Hursley Park, SO21 2JN
> > > > > >>
> > > > > >> IBM United Kingdom Limited Registered in England and Wales with
> > > number
> > > > > >> 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
> > > Hants.
> > > > > PO6
> > > > > >> 3AU
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> From:   Rajini Sivaram 
> > > > > >> To: dev@kafka.apache.org
> > > > > >> Date:   06/04/2017 10:53
> > > > > >> Subject:[VOTE] KIP-86: Configurable SASL callback
> handlers
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> Hi all,
> > > > > >>
> > > > > >> I would like to start the voting process for KIP-86:
> > > > > >>
> > > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > >> 86%3A+Configurable+SASL+callback+handlers
> > > > > >>
> > > > > >>
> > > > > >> The KIP makes callback handlers for SASL configurable to make it
> > > > simpler
> > > > > >> to
> > > > > >> integrate with custom authentication database or custom
> > > authentication
> > > > > >> servers. This is particularly useful for SASL/PLAIN where the
> > > > > >> implementation in Kafka based on credentials stored in jaas.conf
> > is
> > > > not
> > > > > >> suitable for production use. It is also useful for SCRAM in
> > > > environments
> > > > > >> where ZooKeeper is not secure.
> > > > > >>
> > > > > >> Thank you...
> > > > > >>
> > > > > >> Regards,
> > > > > >>
> > > > > >> Rajini
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> Unless stated otherwise above:
> > > > > >> IBM United Kingdom Limited - Registered in England and Wales
> with
> > > > number
> > > > > >> 741598.
> > > > > >> Registered office: PO Box 41, North Harbour, Portsmouth,
> Hampshire
> > > PO6
> > > > > 3AU
> > > > > >>
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (KAFKA-6478) kafka-run-class.bat fails if CLASSPATH contains spaces

2018-01-24 Thread Bert Roos (JIRA)
Bert Roos created KAFKA-6478:


 Summary: kafka-run-class.bat fails if CLASSPATH contains spaces
 Key: KAFKA-6478
 URL: https://issues.apache.org/jira/browse/KAFKA-6478
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 1.0.0
Reporter: Bert Roos


If the CLASSPATH environment variable contains spaces, script 
{{kafka-run-class.bat}} fails to start.

The easy solution is to put quotes around it. See [PR 
#4469|https://github.com/apache/kafka/pull/4469] for a fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-249: Add Delegation Token Operations to Kafka Admin Client

2018-01-24 Thread Satish Duggana
 +1, thanks for the KIP.

~Satish.

On Wed, Jan 24, 2018 at 5:09 AM, Jun Rao  wrote:

> Hi, Mani,
>
> Thanks for the KIP. +1
>
> Jun
>
> On Sun, Jan 21, 2018 at 7:44 AM, Manikumar 
> wrote:
>
> > Hi All,
> >
> > I would like to start a vote on KIP-249 which would add delegation token
> > operations
> > to Java Admin Client.
> >
> > We have merged DelegationToken API PR recently. We want to include admin
> > client changes in the upcoming release. This will make the feature
> > complete.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 249%3A+Add+Delegation+Token+Operations+to+KafkaAdminClient
> >
> > Thanks,
> >
>


Re: [VOTE] KIP-232: Detect outdated metadata using leaderEpoch and partitionEpoch

2018-01-24 Thread Dong Lin
Yes, in general we can not prevent OffsetOutOfRangeException if user seeks
to a wrong offset. The main goal is to prevent OffsetOutOfRangeException if
user has done things in the right way, e.g. user should know that there is
message with this offset.

For example, if user calls seek(..) right after construction, the only
reason I can think of is that user stores offset externally. In this case,
user currently needs to use the offset which is obtained using position(..)
from the last run. With this KIP, user needs to get the offset and the
offsetEpoch using positionAndOffsetEpoch(...) and stores these information
externally. The next time user starts consumer, he/she needs to call
seek(..., offset, offsetEpoch) right after construction. Then KIP should be
able to ensure that we don't throw OffsetOutOfRangeException if there is no
unclean leader election.

Does this sound OK?

Regards,
Dong


On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang  wrote:

> "If consumer wants to consume message with offset 16, then consumer must
> have
> already fetched message with offset 15"
>
> --> this may not be always true right? What if consumer just call seek(16)
> after construction and then poll without committed offset ever stored
> before? Admittedly it is rare but we do not programmably disallow it.
>
>
> Guozhang
>
> On Tue, Jan 23, 2018 at 10:42 PM, Dong Lin  wrote:
>
> > Hey Guozhang,
> >
> > Thanks much for reviewing the KIP!
> >
> > In the scenario you described, let's assume that broker A has messages
> with
> > offset up to 10, and broker B has messages with offset up to 20. If
> > consumer wants to consume message with offset 9, it will not receive
> > OffsetOutOfRangeException
> > from broker A.
> >
> > If consumer wants to consume message with offset 16, then consumer must
> > have already fetched message with offset 15, which can only come from
> > broker B. Because consumer will fetch from broker B only if leaderEpoch
> >=
> > 2, then the current consumer leaderEpoch can not be 1 since this KIP
> > prevents leaderEpoch rewind. Thus we will not have
> > OffsetOutOfRangeException
> > in this case.
> >
> > Does this address your question, or maybe there is more advanced scenario
> > that the KIP does not handle?
> >
> > Thanks,
> > Dong
> >
> > On Tue, Jan 23, 2018 at 9:43 PM, Guozhang Wang 
> wrote:
> >
> > > Thanks Dong, I made a pass over the wiki and it lgtm.
> > >
> > > Just a quick question: can we completely eliminate the
> > > OffsetOutOfRangeException with this approach? Say if there is
> consecutive
> > > leader changes such that the cached metadata's partition epoch is 1,
> and
> > > the metadata fetch response returns  with partition epoch 2 pointing to
> > > leader broker A, while the actual up-to-date metadata has partition
> > epoch 3
> > > whose leader is now broker B, the metadata refresh will still succeed
> and
> > > the follow-up fetch request may still see OORE?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Jan 23, 2018 at 3:47 PM, Dong Lin  wrote:
> > >
> > > > Hi all,
> > > >
> > > > I would like to start the voting process for KIP-232:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 232%3A+Detect+outdated+metadata+using+leaderEpoch+and+partitionEpoch
> > > >
> > > > The KIP will help fix a concurrency issue in Kafka which currently
> can
> > > > cause message loss or message duplication in consumer.
> > > >
> > > > Regards,
> > > > Dong
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>