Re: [VOTE] - KIP-213 Support non-key joining in KTable

2018-11-26 Thread Jan Filipiak


On 07.11.2018 22:24, Adam Bellemare wrote:
> Bumping this thread, as per convention - 1
>
> On Fri, Nov 2, 2018 at 8:22 AM Adam Bellemare 
> wrote:
>
>> As expected :) But still, thanks none-the-less!
>>
>> On Fri, Nov 2, 2018 at 3:36 AM Jan Filipiak 
>> wrote:
>>
>>> reminder
>>>
>>> On 30.10.2018 15:47, Adam Bellemare wrote:
 Hi All

 I would like to call a vote on

>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
>>> .
 This allows a Kafka Streams DSL user to perform KTable to KTable
 foreign-key joins on their data. I have been using this in production
>>> for
 some time and I have composed a PR that enables this. It is a fairly
 extensive PR, but I believe it will add considerable value to the Kafka
 Streams DSL.

 The PR can be found here:
 https://github.com/apache/kafka/pull/5527

 See
>>> http://mail-archives.apache.org/mod_mbox/kafka-dev/201810.mbox/browser
 for previous discussion thread.

 I would also like to give a shout-out to Jan Filipiak who helped me out
 greatly in this project, and who led the initial work into this problem.
 Without Jan's help and insight I do not think this would have been
>>> possible
 to get to this point.

 Adam

>>>
>>
>


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-26 Thread Boyang Chen
Thanks Mayuresh and Jason for your follow-ups! Let me try to answer both in 
this reply.


>1. Do you intend to have member.id is a static config like member.name
>after KIP-345 and KIP-394?

No, we shall only rely on broker to allocate member.id for the consumer 
instances. FYI, I already

started the discussion thread for KIP-394 

>2. Regarding "On client side, we add a new config called MEMBER_NAME in
>ConsumerConfig. On consumer service init, if the MEMBER_NAME config is
> set,
>we will put it in the initial join group request to identify itself as a
>static member (static membership); otherwise, we will still send
>UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic
>membership)."
>   - What is the value of member_id sent in the first JoinGroupRequest
>   when member_name is set (using static rebalance)? Is it
> UNKNOW_MEMBER_ID?

Yes, we could only use unknown member id. Actually this part of the proposal is 
outdated,

let me do another audit of the whole doc. Basically, it is currently impossible 
to send `member.id`

when consumer restarted. Sorry for the confusions!

>3. Regarding "we are requiring member.id (if not unknown) to match the
>value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge case
>that if we could have members with the same `member.name` (for example
>mis-configured instances with a valid member.id but added a used member
>name on runtime). When member name has duplicates, we could refuse join
>request from members with an outdated `member.id` (since we update the
>mapping upon each join group request). In an edge case where the client
>hits this exception in the response, it is suggesting that some other
>consumer takes its spot."
>   - The part of "some other consumer takes the spot" would be
>   intentional, right? Also when you say " The edge case that if we
>   could have members with the same `member.name` (for example
>   mis-configured instances *with a valid member.id 
> 
> *but
>   added a used member name on runtime).", what do you mean by *valid
>   member id* here? Does it mean that there exist a mapping of
>   member.name to member.id like *MemberA -> id1* on the
>   GroupCoordinator and this consumer is trying to join with *
> member.name
>   
> 
>  = MemberB and member.id 
> 
>  =
> id1 *
>   ?

I would take Jason's advice that each time we have unknown member joining the 
group, the broker will

always assign a new and unique id to track its identity. In this way, consumer 
with duplicate member name

will be fenced.

>4. Depending on your explanation for point 2 and the point 3 above
>regarding returning back MEMBER_ID_MISMATCH on having a matching
>member_name but unknown member_id, if the consumer sends
> "UNKNOW_MEMBER_ID"
>on the first JoinGroupRequest and relies on the GroupCoordinator to
> give it
>a member_id, is the consumer suppose to remember member_id for
>joinGroupRequests? If yes, how are restarts handled?

Like explained above, we shall not materialize the member.id. Instead we need 
to rely on broker to allocate

a unique id for consumer just like what we have now.

>5. Regarding "So in summary, *the member will only be removed due to
>session timeout*. We shall remove it from both in-memory static member
>name mapping and member list."
>   - If the rebalance is invoked manually using the the admin apis, how
>   long should the group coordinator wait for the members of the
> group to send
>   a JoinGroupRequest for participating in the rebalance? How is a
> lagging
>   consumer handled?

The plan is to disable member kick out when rebalance.timeout is reached, so 
basically we are not "waiting" any

join group request from existing members; we shall just rebalance base on what 
we currently have within the group

metadata. Lagging consumer will trigger rebalance later if session timeout > 
rebalance timeout.

>6. Another detail to take care is that we need to automatically take the
>hash of group id so that we know which broker to send this request to.
>   - I assume this should be 

Jenkins build is back to normal : kafka-trunk-jdk11 #116

2018-11-26 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk8 #3219

2018-11-26 Thread Apache Jenkins Server
See 


Changes:

[cmccabe] Trogdor: Add Task State filter to /coordinator/tasks endpoint (#5907)

--
[...truncated 2.38 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

> Task :streams:streams-scala:test

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionJava STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionJava PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegion STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegion PASSED


Re: [DISCUSS] KIP-360: Improve handling of unknown producer

2018-11-26 Thread Jason Gustafson
Hey Guozhang,

Thanks for the comments. Responses below:

0. The new API is used between brokers, so we govern its usage using
`inter.broker.protocol.version`. If the other broker hasn't upgraded, we
will just fallback to the old logic, which is to accept the write. This is
similar to how we introduced the OffsetsForLeaderEpoch API. Does that seem
reasonable?

To tell the truth, after digging this KIP up and reading it over, I am
doubting how crucial this API is. It is attempting to protect a write from
a zombie which has just reset its sequence number after that producer had
had its state cleaned up. However, one of the other improvements in this
KIP is to maintain producer state beyond its retention in the log. I think
that makes this case sufficiently unlikely that we can leave it for future
work. I am not 100% sure this is the only scenario where transaction state
and log state can diverge anyway, so it would be better to consider this
problem more generally. What do you think?

1. Thanks, from memory, the term changed after the first iteration. I'll
make a pass and try to clarify usage.
2. I was attempting to handle some edge cases since this check would be
asynchronous. In any case, if we drop this validation as suggested above,
then we can ignore this.

-Jason



On Tue, Nov 13, 2018 at 6:23 PM Guozhang Wang  wrote:

> Hello Jason, thanks for the great write-up.
>
> 0. One question about the migration plan: "The new GetTransactionState API
> and the new version of the transaction state message will not be used until
> the inter-broker version supports it." I'm not so clear about the
> implementation details here: say a broker is on the newer version and the
> txn-coordinator is still on older version. Today the APIVersionsRequest can
> only help upgrade / downgrade the request version, but not forbidding
> sending any. Are you suggesting we add additional logic on the broker side
> to handle the case of "not sending the request"? If yes my concern is that
> this will be some tech-debt code that will live long before being removed.
>
> Some additional minor comments:
>
> 1. "last epoch" and "instance epoch" seem to be referring to the same thing
> in your wiki.
> 2. "The broker must verify after receiving the response that the producer
> state is still unknown.": not sure why we have to validate? If the metadata
> returned from the txn-coordinator can always be considered the
> source-of-truth, can't we just bindly use it to update the cache?
>
>
> Guozhang
>
>
>
> On Thu, Sep 6, 2018 at 9:10 PM Matthias J. Sax 
> wrote:
>
> > I am +1 on this :)
> >
> >
> > -Matthias
> >
> > On 9/4/18 9:55 AM, Jason Gustafson wrote:
> > > Bump. Thanks to Magnus for noticing that I forgot to link to the KIP:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-360%3A+Improve+handling+of+unknown+producer
> > > .
> > >
> > > -Jason
> > >
> > > On Tue, Aug 21, 2018 at 4:37 PM, Jason Gustafson 
> > wrote:
> > >
> > >> Hi All,
> > >>
> > >> I have a proposal to improve the transactional/idempotent producer's
> > >> handling of the UNKNOWN_PRODUCER error, which is the result of losing
> > >> producer state following segment removal. The current behavior is both
> > >> complex and limited. Please take a look and let me know what you
> think.
> > >>
> > >> Thanks in advance to Matthias Sax for feedback on the initial draft.
> > >>
> > >> -Jason
> > >>
> > >
> >
> >
>
> --
> -- Guozhang
>


[DISCUSS] KIP-394: Require member.id for initial join group request

2018-11-26 Thread Boyang Chen
Hey friends,


I would like to start a discussion thread for KIP-394 which is trying to 
mitigate broker cache bursting issue due to anonymous join group requests:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member.id+for+initial+join+group+request


Thanks!

Boyang


Re: [DISCUSS] KIP-391: Allow Producing with Offsets for Cluster Replication

2018-11-26 Thread Jason Gustafson
Another wrinkle to consider is KIP-320. If you are planning to replicate
__consumer_offsets directly, then you will have to account for leader epoch
information which is stored with the committed offsets. But I cannot think
how it would be possible to replicate the leader epoch information in
messages even if you can preserve offsets.

-Jason

On Mon, Nov 26, 2018 at 1:16 PM Mayuresh Gharat 
wrote:

> Hi Edoardo,
>
> Thanks a lot for the KIP.
>  I have a few questions/suggestions in addition to what Radai has mentioned
> above :
>
>1. Is this meant only for 1:1 replication, for example one Kafka cluster
>replicating to other, instead of having multiple Kafka clusters
> mirroring
>into one Kafka cluster?
>2. Are we relying on exactly once produce in the replicator? If not, how
>are retries handled in the replicator ?
>3. What is the recommended value for inflight requests, here. Is it
>suppose to be strictly 1, if yes, it would be great to mention that in
> the
>KIP.
>4. How is unclean Leader election between source cluster and destination
>cluster handled?
>5. How are offsets resets in case of the replicator's consumer handled?
>6. It would be good to explain the workflow in the KIP, with an
>example,  regarding how this KIP will change the replication scenario
> and
>how it will benefit the consumer apps.
>
> Thanks,
>
> Mayuresh
>
> On Mon, Nov 26, 2018 at 8:08 AM radai  wrote:
>
> > a few questions:
> >
> > 1. how do you handle possible duplications caused by the "special"
> > producer timing-out/retrying? are you explicitely relying on the
> > "exactly once" sequencing?
> > 2. what about the combination of log compacted topics + replicator
> > downtime? by the time the replicator comes back up there might be
> > "holes" in the source offsets (some msgs might have been compacted
> > out)? how is that recoverable?
> > 3. similarly, what if you try and fire up replication on a non-empty
> > source topic? does the kip allow for offsets starting at some
> > arbitrary X > 0 ? or would this have to be designed from the start.
> >
> > and lastly, since this KIP seems to be designed fro active-passive
> > failover (there can be no produce traffic except the replicator)
> > wouldnt a solution based on seeking to a time offset be more generic?
> > your producers could checkpoint the last (say log append) timestamp of
> > records theyve seen, and when restoring in the remote site seek to
> > those timestamps (which will be metadata in their committed offsets) -
> > assumming replication takes > 0 time you'd need to handle some dups,
> > but every kafka consumer setup needs to know how to handle those
> > anyway.
> > On Fri, Nov 23, 2018 at 2:27 AM Edoardo Comar  wrote:
> > >
> > > Hi Stanislav
> > >
> > > > > The flag is needed to distinguish a batch with a desired base
> offset
> > > of
> > > > 0,
> > > > from a regular batch for which offsets need to be generated.
> > > > If the producer can provide offsets, why not provide a base offset of
> > 0?
> > >
> > > a regular batch (for which offsets are generated by the broker on
> write)
> > > is sent with a base offset of 0.
> > > How could you distinguish it from a batch where you *want* the first
> > > record to be written at offset 0 (i.e. be the first in the partition
> and
> > > be rejected if there are records on the log already) ?
> > > We wanted to avoid a "deep" inspection (and potentially decompression)
> of
> > > the records.
> > >
> > > For the replicator use case, a single produce request where all the
> data
> > > is to be assumed with offset,
> > > or all without offsets, seems to suffice,
> > > So we added only a toplevel flag, not a per-topic-partition one.
> > >
> > > Thanks for your interest !
> > > cheers
> > > Edo
> > > --
> > >
> > > Edoardo Comar
> > >
> > > IBM Event Streams
> > > IBM UK Ltd, Hursley Park, SO21 2JN
> > >
> > >
> > > Stanislav Kozlovski  wrote on 22/11/2018
> > 22:32:42:
> > >
> > > > From: Stanislav Kozlovski 
> > > > To: dev@kafka.apache.org
> > > > Date: 22/11/2018 22:33
> > > > Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for
> > > > Cluster Replication
> > > >
> > > > Hey Edo & Mickael,
> > > >
> > > > > The flag is needed to distinguish a batch with a desired base
> offset
> > > of
> > > > 0,
> > > > from a regular batch for which offsets need to be generated.
> > > > If the producer can provide offsets, why not provide a base offset of
> > 0?
> > > >
> > > > > (I am reading your post thinking about
> > > > partitions rather than topics).
> > > > Yes, I meant partitions. Sorry about that.
> > > >
> > > > Thanks for answering my questions :)
> > > >
> > > > Best,
> > > > Stanislav
> > > >
> > > > On Thu, Nov 22, 2018 at 5:28 PM Edoardo Comar 
> > wrote:
> > > >
> > > > > Hi Stanislav,
> > > > >
> > > > > you're right we envision the replicator use case to have a single
> > > producer
> > > > > with 

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-26 Thread Jason Gustafson
Hi Boyang,

Thanks for the updates. Looks like we're headed in the right direction and
clearly the interest that this KIP is receiving shows how strong the
motivation is!

I have a few questions:

1. This may be the same thing that Mayuresh is asking about. I think the
suggestion in the KIP is that if a consumer sends JoinGroup with a member
name, but no member id, then we will return the current member id
associated with that name. It seems in this case that we wouldn't be able
to protect from having two consumers active with the same configured
member.name? For example, imagine that we had a consumer with member.name=A
which is assigned member.id=1. Suppose it becomes a zombie and a new
instance starts up with member.name=A. If it is also assigned member.id=1,
then how can we detect the zombie if it comes back to life? Both instances
will have the same member.id.

The goal is to avoid a rebalance on a rolling restart, but we still need to
fence previous members. I am wondering if we can generate a new member.id
every time we receive a request from a static member with an unknown member
id. If the old instance with the same member.name attempts any operation,
then it will be fenced with an UNKNOWN_MEMBER_ID error. As long as the
subscription of the new instance hasn't changed, then we can skip the
rebalance and return the current assignment without forcing a rebalance.

The trick to making this work is in the error handling of the zombie
consumer. If the zombie simply resets its member.id and rejoins to get a
new one upon receiving the UNKNOWN_MEMBER_ID error, then it would end up
fencing the new member. We want to avoid this. There needs to be an
expectation for static members that the member.id of a static member will
not be changed except when a new member with the same member.name joins the
group. Then we can treat UNKNOWN_MEMBER_ID as a fatal error for consumers
with static member names.

2. The mechanics of the ConsumerRebalance API seem unclear to me. As far as
I understand it, it is used for scaling down a consumer group and somehow
bypasses normal session timeout expiration. I am wondering how critical
this piece is and whether we can leave it for future work. If not, then it
would be helpful to elaborate on its implementation. How would the
coordinator know which members to kick out of the group?

3. I've been holding back on mentioning this, but I think we should
reconsider the name `member.name`. I think we want something that suggests
its expectation of uniqueness in the group. How about `group.instance.id`
to go along with `group.id`?

Thanks,
Jason



On Mon, Nov 26, 2018 at 10:18 AM Mayuresh Gharat 
wrote:

> Hi Boyang,
>
> Thanks a lot for replying to all the queries and discussions here, so
> patiently.
> Really appreciate it.
>
> Had a few questions and suggestions after rereading the current version of
> the KIP :
>
>
>1. Do you intend to have member.id is a static config like member.name
>after KIP-345 and KIP-394?
>2. Regarding "On client side, we add a new config called MEMBER_NAME in
>ConsumerConfig. On consumer service init, if the MEMBER_NAME config is
> set,
>we will put it in the initial join group request to identify itself as a
>static member (static membership); otherwise, we will still send
>UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic
>membership)."
>   - What is the value of member_id sent in the first JoinGroupRequest
>   when member_name is set (using static rebalance)? Is it
> UNKNOW_MEMBER_ID?
>3. Regarding "we are requiring member.id (if not unknown) to match the
>value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge case
>that if we could have members with the same `member.name` (for example
>mis-configured instances with a valid member.id but added a used member
>name on runtime). When member name has duplicates, we could refuse join
>request from members with an outdated `member.id` (since we update the
>mapping upon each join group request). In an edge case where the client
>hits this exception in the response, it is suggesting that some other
>consumer takes its spot."
>   - The part of "some other consumer takes the spot" would be
>   intentional, right? Also when you say " The edge case that if we
>   could have members with the same `member.name` (for example
>   mis-configured instances *with a valid member.id 
> *but
>   added a used member name on runtime).", what do you mean by *valid
>   member id* here? Does it mean that there exist a mapping of
>   member.name to member.id like *MemberA -> id1* on the
>   GroupCoordinator and this consumer is trying to join with *
> member.name
>    = MemberB and member.id  =
> id1 *
>   ?
>4. Depending on your explanation for point 2 and the point 3 above
>regarding returning back MEMBER_ID_MISMATCH 

[jira] [Created] (KAFKA-7676) Kafka compile failed with Gradle 5.0

2018-11-26 Thread Jiangtao Liu (JIRA)
Jiangtao Liu created KAFKA-7676:
---

 Summary: Kafka compile failed with Gradle 5.0
 Key: KAFKA-7676
 URL: https://issues.apache.org/jira/browse/KAFKA-7676
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangtao Liu


With the version described in README [Kafka requires Gradle 4.7 or 
higher.|[https://github.com/apache/kafka]|https://github.com/apache/kafka],].

When I try to use Grade 5 but failed with following compiling error
{noformat}
org.gradle.api.ProjectConfigurationException: A problem occurred configuring 
root project 'kafka'.
at 
org.gradle.configuration.project.LifecycleProjectEvaluator.wrapException(LifecycleProjectEvaluator.java:80)
at 
org.gradle.configuration.project.LifecycleProjectEvaluator.addConfigurationFailure(LifecycleProjectEvaluator.java:73)
at 
org.gradle.configuration.project.LifecycleProjectEvaluator.access$600(LifecycleProjectEvaluator.java:54)
at 
org.gradle.configuration.project.LifecycleProjectEvaluator$EvaluateProject$1.run(LifecycleProjectEvaluator.java:109)
at org.gradle.internal.Factories$1.create(Factories.java:25)
at 
org.gradle.internal.work.DefaultWorkerLeaseService.withLocks(DefaultWorkerLeaseService.java:183)
at 
org.gradle.internal.work.StopShieldingWorkerLeaseService.withLocks(StopShieldingWorkerLeaseService.java:40)
at 
org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withProjectLock(DefaultProjectStateRegistry.java:226)
at 
org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withMutableState(DefaultProjectStateRegistry.java:220)
at 
org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withMutableState(DefaultProjectStateRegistry.java:186)
at 
org.gradle.configuration.project.LifecycleProjectEvaluator$EvaluateProject.run(LifecycleProjectEvaluator.java:96)
at 
org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:301)
at 
org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:293)
at 
org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:175)
at 
org.gradle.internal.operations.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:91)
at 
org.gradle.internal.operations.DelegatingBuildOperationExecutor.run(DelegatingBuildOperationExecutor.java:31)
at 
org.gradle.configuration.project.LifecycleProjectEvaluator.evaluate(LifecycleProjectEvaluator.java:68)
at 
org.gradle.api.internal.project.DefaultProject.evaluate(DefaultProject.java:687)
at 
org.gradle.api.internal.project.DefaultProject.evaluate(DefaultProject.java:140)
at 
org.gradle.execution.TaskPathProjectEvaluator.configure(TaskPathProjectEvaluator.java:35)
at 
org.gradle.execution.TaskPathProjectEvaluator.configureHierarchy(TaskPathProjectEvaluator.java:60)
at 
org.gradle.configuration.DefaultBuildConfigurer.configure(DefaultBuildConfigurer.java:41)
at 
org.gradle.initialization.DefaultGradleLauncher$ConfigureBuild.run(DefaultGradleLauncher.java:286)
at 
org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:301)
at 
org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:293)
at 
org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:175)
at 
org.gradle.internal.operations.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:91)
at 
org.gradle.internal.operations.DelegatingBuildOperationExecutor.run(DelegatingBuildOperationExecutor.java:31)
at 
org.gradle.initialization.DefaultGradleLauncher.configureBuild(DefaultGradleLauncher.java:194)
at 
org.gradle.initialization.DefaultGradleLauncher.doBuildStages(DefaultGradleLauncher.java:150)
at 
org.gradle.initialization.DefaultGradleLauncher.getConfiguredBuild(DefaultGradleLauncher.java:128)
at 
org.gradle.internal.invocation.GradleBuildController$2.execute(GradleBuildController.java:88)
at 
org.gradle.internal.invocation.GradleBuildController$2.execute(GradleBuildController.java:85)
at 
org.gradle.internal.invocation.GradleBuildController$3.create(GradleBuildController.java:103)
at 
org.gradle.internal.invocation.GradleBuildController$3.create(GradleBuildController.java:96)
at 
org.gradle.internal.work.DefaultWorkerLeaseService.withLocks(DefaultWorkerLeaseService.java:183)
at 
org.gradle.internal.work.StopShieldingWorkerLeaseService.withLocks(StopShieldingWorkerLeaseService.java:40)
at 
org.gradle.internal.invocation.GradleBuildController.doBuild(GradleBuildController.java:96)
at 
org.gradle.internal.invocation.GradleBuildController.configure(GradleBuildController.java:85)
at 

Re: [DISCUSS] KIP-391: Allow Producing with Offsets for Cluster Replication

2018-11-26 Thread Mayuresh Gharat
Hi Edoardo,

Thanks a lot for the KIP.
 I have a few questions/suggestions in addition to what Radai has mentioned
above :

   1. Is this meant only for 1:1 replication, for example one Kafka cluster
   replicating to other, instead of having multiple Kafka clusters mirroring
   into one Kafka cluster?
   2. Are we relying on exactly once produce in the replicator? If not, how
   are retries handled in the replicator ?
   3. What is the recommended value for inflight requests, here. Is it
   suppose to be strictly 1, if yes, it would be great to mention that in the
   KIP.
   4. How is unclean Leader election between source cluster and destination
   cluster handled?
   5. How are offsets resets in case of the replicator's consumer handled?
   6. It would be good to explain the workflow in the KIP, with an
   example,  regarding how this KIP will change the replication scenario and
   how it will benefit the consumer apps.

Thanks,

Mayuresh

On Mon, Nov 26, 2018 at 8:08 AM radai  wrote:

> a few questions:
>
> 1. how do you handle possible duplications caused by the "special"
> producer timing-out/retrying? are you explicitely relying on the
> "exactly once" sequencing?
> 2. what about the combination of log compacted topics + replicator
> downtime? by the time the replicator comes back up there might be
> "holes" in the source offsets (some msgs might have been compacted
> out)? how is that recoverable?
> 3. similarly, what if you try and fire up replication on a non-empty
> source topic? does the kip allow for offsets starting at some
> arbitrary X > 0 ? or would this have to be designed from the start.
>
> and lastly, since this KIP seems to be designed fro active-passive
> failover (there can be no produce traffic except the replicator)
> wouldnt a solution based on seeking to a time offset be more generic?
> your producers could checkpoint the last (say log append) timestamp of
> records theyve seen, and when restoring in the remote site seek to
> those timestamps (which will be metadata in their committed offsets) -
> assumming replication takes > 0 time you'd need to handle some dups,
> but every kafka consumer setup needs to know how to handle those
> anyway.
> On Fri, Nov 23, 2018 at 2:27 AM Edoardo Comar  wrote:
> >
> > Hi Stanislav
> >
> > > > The flag is needed to distinguish a batch with a desired base offset
> > of
> > > 0,
> > > from a regular batch for which offsets need to be generated.
> > > If the producer can provide offsets, why not provide a base offset of
> 0?
> >
> > a regular batch (for which offsets are generated by the broker on write)
> > is sent with a base offset of 0.
> > How could you distinguish it from a batch where you *want* the first
> > record to be written at offset 0 (i.e. be the first in the partition and
> > be rejected if there are records on the log already) ?
> > We wanted to avoid a "deep" inspection (and potentially decompression) of
> > the records.
> >
> > For the replicator use case, a single produce request where all the data
> > is to be assumed with offset,
> > or all without offsets, seems to suffice,
> > So we added only a toplevel flag, not a per-topic-partition one.
> >
> > Thanks for your interest !
> > cheers
> > Edo
> > --
> >
> > Edoardo Comar
> >
> > IBM Event Streams
> > IBM UK Ltd, Hursley Park, SO21 2JN
> >
> >
> > Stanislav Kozlovski  wrote on 22/11/2018
> 22:32:42:
> >
> > > From: Stanislav Kozlovski 
> > > To: dev@kafka.apache.org
> > > Date: 22/11/2018 22:33
> > > Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for
> > > Cluster Replication
> > >
> > > Hey Edo & Mickael,
> > >
> > > > The flag is needed to distinguish a batch with a desired base offset
> > of
> > > 0,
> > > from a regular batch for which offsets need to be generated.
> > > If the producer can provide offsets, why not provide a base offset of
> 0?
> > >
> > > > (I am reading your post thinking about
> > > partitions rather than topics).
> > > Yes, I meant partitions. Sorry about that.
> > >
> > > Thanks for answering my questions :)
> > >
> > > Best,
> > > Stanislav
> > >
> > > On Thu, Nov 22, 2018 at 5:28 PM Edoardo Comar 
> wrote:
> > >
> > > > Hi Stanislav,
> > > >
> > > > you're right we envision the replicator use case to have a single
> > producer
> > > > with offsets per partition (I am reading your post thinking about
> > > > partitions rather than topics).
> > > >
> > > > If a regular producer was to send its own records at the same time,
> > it's
> > > > very likely that the one sending with an offset will fail because of
> > > > invalid offsets.
> > > > Same if two producers were sending with offsets, likely both would
> > then
> > > > fail.
> > > >
> > > > > Does it make sense to *lock* the topic from other producers while
> > there
> > > > is
> > > > > one that uses offsets?
> > > >
> > > > You could do that with ACL permissions if you wanted, I don't think
> it
> > > > needs to be mandated by 

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-11-26 Thread xiongqi wu
Thanks for binding and non-binding votes.
Can I get one more binding vote?

Thanks in advance!

Xiongqi (Wesley) Wu


On Wed, Nov 14, 2018 at 7:29 PM Matt Farmer  wrote:

> I'm a +1 (non-binding) — This looks like it would have saved us a lot of
> pain in an issue we had to debug recently. I can't go into details, but
> figuring out how to achieve this effect gave me quite a headache. :)
>
> On Mon, Nov 12, 2018 at 1:00 PM xiongqi wu  wrote:
>
> > Hi all,
> >
> > Can I have one more vote on this KIP?
> > Any comment is appreciated.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Add+a+Maximum+Log+Compaction+Lag
> >
> >
> > Xiongqi (Wesley) Wu
> >
> >
> > On Fri, Nov 9, 2018 at 7:56 PM xiongqi wu  wrote:
> >
> > > Thanks Dong.
> > > I have updated the KIP.
> > >
> > > Xiongqi (Wesley) Wu
> > >
> > >
> > > On Fri, Nov 9, 2018 at 5:31 PM Dong Lin  wrote:
> > >
> > >> Thanks for the KIP Xiongqi. LGTM. +1 (binding)
> > >>
> > >> One minor comment: it may be a bit better to clarify in the public
> > >> interface section that the value of the newly added metric is
> determined
> > >> based by applying that formula across all compactable segments. For
> > >> example:
> > >>
> > >> The maximum value of Math.max(now -
> > >> earliest_timestamp_in_ms_of_uncompacted_segment -
> max.compaction.lag.ms
> > ,
> > >> 0)/1000 across all compactable partitions, where the
> > >> max.compaction.lag.ms
> > >> can be overridden on per-topic basis.
> > >>
> > >>
> > >>
> > >> On Fri, Nov 9, 2018 at 5:16 PM xiongqi wu 
> wrote:
> > >>
> > >> > Thanks Joel.
> > >> > Tracking the delay at second granularity makes sense
> > >> > I have updated KIP.
> > >> >
> > >> > Xiongqi (Wesley) Wu
> > >> >
> > >> >
> > >> > On Fri, Nov 9, 2018 at 5:05 PM Joel Koshy 
> > wrote:
> > >> >
> > >> > > +1 with one suggestion on the proposed metric. You should probably
> > >> > include
> > >> > > the unit. So for e.g., max-compaction-delay-secs.
> > >> > >
> > >> > > Joel
> > >> > >
> > >> > > On Tue, Nov 6, 2018 at 5:30 PM xiongqi wu 
> > >> wrote:
> > >> > >
> > >> > > > bump
> > >> > > > Xiongqi (Wesley) Wu
> > >> > > >
> > >> > > >
> > >> > > > On Thu, Sep 27, 2018 at 4:20 PM xiongqi wu  >
> > >> > wrote:
> > >> > > >
> > >> > > > >
> > >> > > > > Thanks Eno, Brett, Dong, Guozhang, Colin,  and Xiaohe for
> > >> feedback.
> > >> > > > > Can I have more feedback or VOTE on this KIP?
> > >> > > > >
> > >> > > > >
> > >> > > > > Xiongqi (Wesley) Wu
> > >> > > > >
> > >> > > > >
> > >> > > > > On Wed, Sep 19, 2018 at 10:52 AM xiongqi wu <
> > xiongq...@gmail.com>
> > >> > > wrote:
> > >> > > > >
> > >> > > > >> Any other votes or comments?
> > >> > > > >>
> > >> > > > >> Xiongqi (Wesley) Wu
> > >> > > > >>
> > >> > > > >>
> > >> > > > >> On Tue, Sep 11, 2018 at 11:45 AM xiongqi wu <
> > xiongq...@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > > >>
> > >> > > > >>> Yes, more votes and code review.
> > >> > > > >>>
> > >> > > > >>> Xiongqi (Wesley) Wu
> > >> > > > >>>
> > >> > > > >>>
> > >> > > > >>> On Mon, Sep 10, 2018 at 11:37 PM Brett Rann
> > >> > >  > >> > > > >
> > >> > > > >>> wrote:
> > >> > > > >>>
> > >> > > >  +1 (non binding) from on 0 then, and on the KIP.
> > >> > > > 
> > >> > > >  Where do we go from here? More votes?
> > >> > > > 
> > >> > > >  On Tue, Sep 11, 2018 at 5:34 AM Colin McCabe <
> > >> cmcc...@apache.org>
> > >> > > >  wrote:
> > >> > > > 
> > >> > > >  > On Mon, Sep 10, 2018, at 11:44, xiongqi wu wrote:
> > >> > > >  > > Thank you for comments. I will use '0' for now.
> > >> > > >  > >
> > >> > > >  > > If we create topics through admin client in the future,
> > we
> > >> > might
> > >> > > >  perform
> > >> > > >  > > some useful checks. (but the assumption is all brokers
> in
> > >> the
> > >> > > same
> > >> > > >  > cluster
> > >> > > >  > > have the same default configurations value,
> otherwise,it
> > >> might
> > >> > > >  still be
> > >> > > >  > > tricky to do such cross validation check.)
> > >> > > >  >
> > >> > > >  > This isn't something that we might do in the future--
> this
> > is
> > >> > > >  something we
> > >> > > >  > are doing now. We already have Create Topic policies
> which
> > >> are
> > >> > > >  enforced by
> > >> > > >  > the broker. Check KIP-108 and KIP-170 for details. This
> is
> > >> one
> > >> > of
> > >> > > > the
> > >> > > >  > motivations for getting rid of direct ZK access-- making
> > sure
> > >> > that
> > >> > > >  these
> > >> > > >  > policies are applied.
> > >> > > >  >
> > >> > > >  > I agree that having different configurations on different
> > >> > brokers
> > >> > > > can
> > >> > > >  be
> > >> > > >  > confusing and frustrating . That's why more
> configurations
> > >> are
> > >> > > being
> > >> > > >  made
> > >> > > >  > dynamic using KIP-226. Dynamic configurations are stored
> > >> > centrally
> > >> > > > in
> 

Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata growth

2018-11-26 Thread Jason Gustafson
Hi Stanislav,

Thanks for the KIP. Can you clarify the compatibility impact here? What
will happen to groups that are already larger than the max size? Also, just
to be clear, the resource we are trying to conserve here is what? Memory?

-Jason

On Mon, Nov 26, 2018 at 2:44 AM Boyang Chen  wrote:

> Thanks Stanislav for the update! One suggestion I have is that it would be
> helpful to put your
>
> reasoning on deciding the current default value. For example, in certain
> use cases at Pinterest we are very likely
>
> to have more consumers than 250 when we configure 8 stream instances with
> 32 threads.
>
>
> For the effectiveness of this KIP, we should encourage people to discuss
> their opinions on the default setting and ideally reach a consensus.
>
>
> Best,
>
> Boyang
>
> 
> From: Stanislav Kozlovski 
> Sent: Monday, November 26, 2018 6:14 PM
> To: dev@kafka.apache.org
> Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> metadata growth
>
> Hey everybody,
>
> It's been a week since this KIP and not much discussion has been made.
> I assume that this is a straight forward change and I will open a voting
> thread in the next couple of days if nobody has anything to suggest.
>
> Best,
> Stanislav
>
> On Thu, Nov 22, 2018 at 12:56 PM Stanislav Kozlovski <
> stanis...@confluent.io>
> wrote:
>
> > Greetings everybody,
> >
> > I have enriched the KIP a bit with a bigger Motivation section and also
> > renamed it.
> > KIP:
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-389%253A%2BIntroduce%2Ba%2Bconfigurable%2Bconsumer%2Bgroup%2Bsize%2Blimitdata=02%7C01%7C%7C085ed04564f2472e50f308d65387f4fd%7C84df9e7fe9f640afb435%7C1%7C0%7C636788240721218938sdata=C6aXV4T6JWcNPtJhVSNxPrHSm2oTP%2BtGN4XvD4jSUOU%3Dreserved=0
> >
> > I'm looking forward to discussions around it.
> >
> > Best,
> > Stanislav
> >
> > On Tue, Nov 20, 2018 at 1:47 PM Stanislav Kozlovski <
> > stanis...@confluent.io> wrote:
> >
> >> Hey there everybody,
> >>
> >> Thanks for the introduction Boyang. I appreciate the effort you are
> >> putting into improving consumer behavior in Kafka.
> >>
> >> @Matt
> >> I also believe the default value is high. In my opinion, we should aim
> to
> >> a default cap around 250. This is because in the current model any
> consumer
> >> rebalance is disrupting to every consumer. The bigger the group, the
> longer
> >> this period of disruption.
> >>
> >> If you have such a large consumer group, chances are that your
> >> client-side logic could be structured better and that you are not using
> the
> >> high number of consumers to achieve high throughput.
> >> 250 can still be considered of a high upper bound, I believe in practice
> >> users should aim to not go over 100 consumers per consumer group.
> >>
> >> In regards to the cap being global/per-broker, I think that we should
> >> consider whether we want it to be global or *per-topic*. For the time
> >> being, I believe that having it per-topic with a global default might be
> >> the best situation. Having it global only seems a bit restricting to me
> and
> >> it never hurts to support more fine-grained configurability (given it's
> the
> >> same config, not a new one being introduced).
> >>
> >> On Tue, Nov 20, 2018 at 11:32 AM Boyang Chen 
> wrote:
> >>
> >>> Thanks Matt for the suggestion! I'm still open to any suggestion to
> >>> change the default value. Meanwhile I just want to point out that this
> >>> value is a just last line of defense, not a real scenario we would
> expect.
> >>>
> >>>
> >>> In the meanwhile, I discussed with Stanislav and he would be driving
> the
> >>> 389 effort from now on. Stanislav proposed the idea in the first place
> and
> >>> had already come up a draft design, while I will keep focusing on
> KIP-345
> >>> effort to ensure solving the edge case described in the JIRA<
> >>>
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-7610data=02%7C01%7C%7C085ed04564f2472e50f308d65387f4fd%7C84df9e7fe9f640afb435%7C1%7C0%7C636788240721218938sdata=PyOSGb6FhjcIS0XL2vcv2YEUSaYk9lL593ioHS4rRHk%3Dreserved=0
> >.
> >>>
> >>>
> >>> Thank you Stanislav for making this happen!
> >>>
> >>>
> >>> Boyang
> >>>
> >>> 
> >>> From: Matt Farmer 
> >>> Sent: Tuesday, November 20, 2018 10:24 AM
> >>> To: dev@kafka.apache.org
> >>> Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> >>> metadata growth
> >>>
> >>> Thanks for the KIP.
> >>>
> >>> Will this cap be a global cap across the entire cluster or per broker?
> >>>
> >>> Either way the default value seems a bit high to me, but that could
> just
> >>> be
> >>> from my own usage patterns. I’d have probably started with 500 or 1k
> but
> >>> could be easily convinced that’s wrong.
> >>>
> >>> Thanks,
> >>> Matt
> >>>
> >>> On Mon, Nov 19, 2018 at 

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-26 Thread Mayuresh Gharat
Hi Boyang,

Thanks a lot for replying to all the queries and discussions here, so
patiently.
Really appreciate it.

Had a few questions and suggestions after rereading the current version of
the KIP :


   1. Do you intend to have member.id is a static config like member.name
   after KIP-345 and KIP-394?
   2. Regarding "On client side, we add a new config called MEMBER_NAME in
   ConsumerConfig. On consumer service init, if the MEMBER_NAME config is set,
   we will put it in the initial join group request to identify itself as a
   static member (static membership); otherwise, we will still send
   UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic
   membership)."
  - What is the value of member_id sent in the first JoinGroupRequest
  when member_name is set (using static rebalance)? Is it UNKNOW_MEMBER_ID?
   3. Regarding "we are requiring member.id (if not unknown) to match the
   value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge case
   that if we could have members with the same `member.name` (for example
   mis-configured instances with a valid member.id but added a used member
   name on runtime). When member name has duplicates, we could refuse join
   request from members with an outdated `member.id` (since we update the
   mapping upon each join group request). In an edge case where the client
   hits this exception in the response, it is suggesting that some other
   consumer takes its spot."
  - The part of "some other consumer takes the spot" would be
  intentional, right? Also when you say " The edge case that if we
  could have members with the same `member.name` (for example
  mis-configured instances *with a valid member.id  *but
  added a used member name on runtime).", what do you mean by *valid
  member id* here? Does it mean that there exist a mapping of
  member.name to member.id like *MemberA -> id1* on the
  GroupCoordinator and this consumer is trying to join with *member.name
   = MemberB and member.id  = id1 *
  ?
   4. Depending on your explanation for point 2 and the point 3 above
   regarding returning back MEMBER_ID_MISMATCH on having a matching
   member_name but unknown member_id, if the consumer sends "UNKNOW_MEMBER_ID"
   on the first JoinGroupRequest and relies on the GroupCoordinator to give it
   a member_id, is the consumer suppose to remember member_id for
   joinGroupRequests? If yes, how are restarts handled?
   5. Regarding "So in summary, *the member will only be removed due to
   session timeout*. We shall remove it from both in-memory static member
   name mapping and member list."
  - If the rebalance is invoked manually using the the admin apis, how
  long should the group coordinator wait for the members of the
group to send
  a JoinGroupRequest for participating in the rebalance? How is a lagging
  consumer handled?
   6. Another detail to take care is that we need to automatically take the
   hash of group id so that we know which broker to send this request to.
  - I assume this should be same as the way we find the coordinator,
  today right? If yes, should we specify it in the KIP ?
   7. Are there any specific failure scenarios when you say "other
   potential failure cases."? It would be good to mention them explicitly, if
   you think there are any.
   8. It would be good to have a rollback plan as you have for roll forward
   in the KIP.

Thanks,

Mayuresh

On Mon, Nov 26, 2018 at 8:17 AM Mayuresh Gharat 
wrote:

> Hi Boyang,
>
> Do you have a discuss thread for KIP-394 that you mentioned here ?
>
> Thanks,
>
> Mayuresh
>
> On Mon, Nov 26, 2018 at 4:52 AM Boyang Chen  wrote:
>
>> Hey Dong, thanks for the follow-up here!
>>
>>
>> 1) It is not very clear to the user what is the difference between
>> member.name and client.id as both seems to be used to identify the
>> consumer. I am wondering if it would be more intuitive to name it
>> group.member.name (preferred choice since it matches the current group.id
>> config name) or rebalance.member.name to explicitly show that the id is
>> solely used for rebalance.
>> Great question. I feel `member.name` is enough to explain itself, it
>> seems not very
>> helpful to make the config name longer. Comparing `name` with `id` gives
>> user the
>> impression that they have the control over it with customized rule than
>> library decided.
>>
>> 2) In the interface change section it is said that
>> GroupMaxSessionTimeoutMs
>> will be changed to 30 minutes. It seems to suggest that we will change the
>> default value of this config. It does not seem necessary to increase the
>> time of consumer failure detection when user doesn't use static
>> membership.
>> Also, say static membership is enabled, then this default config change
>> will cause a partition to be unavailable for consumption for 30 minutes if
>> there is hard consumer failure, which 

Re: [VOTE] KIP-351: Add --under-min-isr option to describe TopicCommand

2018-11-26 Thread Mickael Maison
+1 (non-binding)
Thanks for the KIP!
On Mon, Nov 26, 2018 at 4:32 PM Kevin Lu  wrote:
>
> Hi All,
>
> I'm bumping this thread as it has been a couple weeks with no activity.
>
> The proposed changes in this KIP are minor, but are extremely helpful for
> operators to immediately identify partitions under min ISR. Please take a
> couple minutes to review and provide a vote.
>
> Thanks~
>
> Regards,
> Kevin
>
> On Thu, Nov 8, 2018 at 1:07 AM Kevin Lu  wrote:
>
> > Hi All,
> >
> > I'm starting the vote thread for KIP-351: Add --under-min-isr option to
> > describe topics command.
> >
> > KIP:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-351%3A+Add+--under-min-isr+option+to+describe+topics+command
> >
> > Discussion thread:
> > https://lists.apache.org/thread.html/90d1652ebc03a7be4100dd101b92a7dcefe63d144856c5f6c132381b@%3Cdev.kafka.apache.org%3E
> >
> > Thanks!
> >
> > Regards,
> > Kevin
> >
> >


[jira] [Created] (KAFKA-7675) Trogdor CLI - Support filter arguments in the '--show-tasks' command

2018-11-26 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7675:
--

 Summary: Trogdor CLI - Support filter arguments in the 
'--show-tasks' command
 Key: KAFKA-7675
 URL: https://issues.apache.org/jira/browse/KAFKA-7675
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


CoordinatorClient's --show-tasks currently does not take any parameters. The 
request it sends behind the curtains has 6 fields which enable filtering of the 
output - 'firstStartMs', 'lastStartMs', 'firstEndMs', 'lastEndMs', 'state' and 
'taskIds'. 
All of them should be exposed to enable users to filter appropriately



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7674) Quick Start Guide steps for managing topics do not work

2018-11-26 Thread Nathan Long (JIRA)
Nathan Long created KAFKA-7674:
--

 Summary: Quick Start Guide steps for managing topics do not work
 Key: KAFKA-7674
 URL: https://issues.apache.org/jira/browse/KAFKA-7674
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Affects Versions: 2.1.0
 Environment: MacOS 10.13.6
Reporter: Nathan Long


I'm following the [Apache Kafka Quick Start 
Guide](https://kafka.apache.org/quickstart) exactly, using [the release linked 
at the 
top](https://www.apache.org/dyn/closer.cgi?path=/kafka/2.0.0/kafka_2.11-2.0.0.tgz),
 which is versioned as "kafka_2.11-2.0.0.tgz".

At "Step 3: Create a Topic", it says to run this to create a topic:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 --topic test

The next step is to list the existing topics:

    bin/kafka-topics.sh --list --zookeeper localhost:2181

The documentation shows that `test` should be listed.

The list command returns no output for me. I'm using the default configuration 
with `log.dirs=/tmp/kafka-logs`, and looking in that directory, I see no change 
when I run the command above to create a topic.

When I publish the first message to the topic, I see it fail to send the 
message, then successfully retry after the topic is auto-created. I'm then able 
to see the message when I run the console consumer:

    bin/kafka-console-consumer.sh --bootstrap-server \
  localhost:9092 --topic test --from-beginning

But even though the topic now clearly exists, when I list topics, it still 
finds nothing:

    bin/kafka-topics.sh --list --zookeeper localhost:2181

I don't know if this is a documentation problem or a bug, but I get no errors 
when running any of these commands, it seems like a bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-351: Add --under-min-isr option to describe TopicCommand

2018-11-26 Thread Kevin Lu
Hi All,

I'm bumping this thread as it has been a couple weeks with no activity.

The proposed changes in this KIP are minor, but are extremely helpful for
operators to immediately identify partitions under min ISR. Please take a
couple minutes to review and provide a vote.

Thanks~

Regards,
Kevin

On Thu, Nov 8, 2018 at 1:07 AM Kevin Lu  wrote:

> Hi All,
>
> I'm starting the vote thread for KIP-351: Add --under-min-isr option to
> describe topics command.
>
> KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-351%3A+Add+--under-min-isr+option+to+describe+topics+command
>
> Discussion thread:
> https://lists.apache.org/thread.html/90d1652ebc03a7be4100dd101b92a7dcefe63d144856c5f6c132381b@%3Cdev.kafka.apache.org%3E
>
> Thanks!
>
> Regards,
> Kevin
>
>


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-26 Thread Mayuresh Gharat
Hi Boyang,

Do you have a discuss thread for KIP-394 that you mentioned here ?

Thanks,

Mayuresh

On Mon, Nov 26, 2018 at 4:52 AM Boyang Chen  wrote:

> Hey Dong, thanks for the follow-up here!
>
>
> 1) It is not very clear to the user what is the difference between
> member.name and client.id as both seems to be used to identify the
> consumer. I am wondering if it would be more intuitive to name it
> group.member.name (preferred choice since it matches the current group.id
> config name) or rebalance.member.name to explicitly show that the id is
> solely used for rebalance.
> Great question. I feel `member.name` is enough to explain itself, it
> seems not very
> helpful to make the config name longer. Comparing `name` with `id` gives
> user the
> impression that they have the control over it with customized rule than
> library decided.
>
> 2) In the interface change section it is said that GroupMaxSessionTimeoutMs
> will be changed to 30 minutes. It seems to suggest that we will change the
> default value of this config. It does not seem necessary to increase the
> time of consumer failure detection when user doesn't use static membership.
> Also, say static membership is enabled, then this default config change
> will cause a partition to be unavailable for consumption for 30 minutes if
> there is hard consumer failure, which seems to be worse experience than
> having unnecessary rebalance (when this timeout is small), particularly for
> new users of Kafka. Could you explain more why we should make this change?
> We are not changing the default session timeout value. We are just
> changing the
> cap we are enforcing on the session timeout max value. So this change is
> not affecting
> what kind of membership end user is using, and loosing the cap is giving
> end user
> more flexibility on trade-off between liveness and stability.
>
> 3) Could we just combine MEMBER_ID_MISMATCH and DUPLICATE_STATIC_MEMBER
> into one error? It seems that these two errors are currently handled by the
> consumer in the same way. And we don't also don't expect MEMBER_ID_MISMATCH
> to happen. Thus it is not clear what is the benefit of having two errors.
> I agree that we should remove DUPLICATE_STATIC_MEMBER error because with
> the KIP-394<
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member+id+for+initial+join+group+request
> >
> we will automatically fence all join requests with UNKNOWN_MEMBER_ID.
>
> 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join group contains
> member name which is already in the consumer group, however the member id
> was missing". After a consumer is restarted, it will send a
> JoinGroupRequest with an existing memberName (as the coordinator has not
> expired this member from the memory) and memberId
> = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not persisted
> across consumer restart in the consumer side). Does it mean that
> JoinGroupRequest from a newly restarted consumer will always be rejected
> until the sessionTimeoutMs has passed?
> Same answer as question 3). This part of the logic shall be removed from
> the proposal.
>
> 5) It seems that we always add two methods to the interface
> org.apache.kafka.clients.admin.AdminClient.java, one with options and the
> other without option. Could this be specified in the interface change
> section?
> Sounds good! Added both methods.
>
> 6) Do we plan to have off-the-shelf command line tool for SRE to trigger
> rebalance? If so, we probably want to specify the command line tool
> interface similar to
>
> https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scriptsdata=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435%7C1%7C0%7C636786393153281080sdata=%2BNasMFlJf9rEJc9iDfndcyxA4%2BGWieS1azSKbtdGRW4%3Dreserved=0
> .
> Added the script.
>
> 7) Would it be simpler to replace name "forceStaticRebalance" with
> "invokeConsumerRebalance"? It is not very clear what is the extra meaning
> of world "force" as compared to "trigger" or "invoke". And it seems simpler
> to allows this API to trigger rebalance regardless of whether consumer is
> configured with memberName.
> Sounds good. Right now I feel for both static and dynamic membership it is
> more manageable to introduce the consumer rebalance method through admin
> client API.
>
> 8) It is not very clear how the newly added AdminClient API trigger
> rebalance. For example, does it send request? Can this be explained in the
> KIP?
>
> Sure, I will add more details to the API.
>
>
> Thanks again for the helpful suggestions!
>
>
> Best,
> Boyang
>
> 
> From: Dong Lin 
> Sent: Saturday, November 24, 2018 2:54 PM
> To: dev
> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying 

Re: [DISCUSS] KIP-391: Allow Producing with Offsets for Cluster Replication

2018-11-26 Thread radai
a few questions:

1. how do you handle possible duplications caused by the "special"
producer timing-out/retrying? are you explicitely relying on the
"exactly once" sequencing?
2. what about the combination of log compacted topics + replicator
downtime? by the time the replicator comes back up there might be
"holes" in the source offsets (some msgs might have been compacted
out)? how is that recoverable?
3. similarly, what if you try and fire up replication on a non-empty
source topic? does the kip allow for offsets starting at some
arbitrary X > 0 ? or would this have to be designed from the start.

and lastly, since this KIP seems to be designed fro active-passive
failover (there can be no produce traffic except the replicator)
wouldnt a solution based on seeking to a time offset be more generic?
your producers could checkpoint the last (say log append) timestamp of
records theyve seen, and when restoring in the remote site seek to
those timestamps (which will be metadata in their committed offsets) -
assumming replication takes > 0 time you'd need to handle some dups,
but every kafka consumer setup needs to know how to handle those
anyway.
On Fri, Nov 23, 2018 at 2:27 AM Edoardo Comar  wrote:
>
> Hi Stanislav
>
> > > The flag is needed to distinguish a batch with a desired base offset
> of
> > 0,
> > from a regular batch for which offsets need to be generated.
> > If the producer can provide offsets, why not provide a base offset of 0?
>
> a regular batch (for which offsets are generated by the broker on write)
> is sent with a base offset of 0.
> How could you distinguish it from a batch where you *want* the first
> record to be written at offset 0 (i.e. be the first in the partition and
> be rejected if there are records on the log already) ?
> We wanted to avoid a "deep" inspection (and potentially decompression) of
> the records.
>
> For the replicator use case, a single produce request where all the data
> is to be assumed with offset,
> or all without offsets, seems to suffice,
> So we added only a toplevel flag, not a per-topic-partition one.
>
> Thanks for your interest !
> cheers
> Edo
> --
>
> Edoardo Comar
>
> IBM Event Streams
> IBM UK Ltd, Hursley Park, SO21 2JN
>
>
> Stanislav Kozlovski  wrote on 22/11/2018 22:32:42:
>
> > From: Stanislav Kozlovski 
> > To: dev@kafka.apache.org
> > Date: 22/11/2018 22:33
> > Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for
> > Cluster Replication
> >
> > Hey Edo & Mickael,
> >
> > > The flag is needed to distinguish a batch with a desired base offset
> of
> > 0,
> > from a regular batch for which offsets need to be generated.
> > If the producer can provide offsets, why not provide a base offset of 0?
> >
> > > (I am reading your post thinking about
> > partitions rather than topics).
> > Yes, I meant partitions. Sorry about that.
> >
> > Thanks for answering my questions :)
> >
> > Best,
> > Stanislav
> >
> > On Thu, Nov 22, 2018 at 5:28 PM Edoardo Comar  wrote:
> >
> > > Hi Stanislav,
> > >
> > > you're right we envision the replicator use case to have a single
> producer
> > > with offsets per partition (I am reading your post thinking about
> > > partitions rather than topics).
> > >
> > > If a regular producer was to send its own records at the same time,
> it's
> > > very likely that the one sending with an offset will fail because of
> > > invalid offsets.
> > > Same if two producers were sending with offsets, likely both would
> then
> > > fail.
> > >
> > > > Does it make sense to *lock* the topic from other producers while
> there
> > > is
> > > > one that uses offsets?
> > >
> > > You could do that with ACL permissions if you wanted, I don't think it
> > > needs to be mandated by changing the broker logic.
> > >
> > >
> > > > Since we are tying the produce-with-offset request to the ACL, do we
> > > need
> > > > the `use_offset` field in the produce request? Maybe we make it
> > > mandatory
> > > > for produce requests with that ACL to have offsets.
> > >
> > > The flag is needed to distinguish a batch with a desired base offset
> of 0,
> > > from a regular batch for which offsets need to be generated.
> > > I would not restrict a principal to only send-with-offsets (by making
> that
> > > mandatory via the ACL).
> > >
> > > Thanks
> > > Edo & Mickael
> > >
> > > --
> > >
> > > Edoardo Comar
> > >
> > > IBM Event Streams
> > > IBM UK Ltd, Hursley Park, SO21 2JN
> > >
> > >
> > > Stanislav Kozlovski  wrote on 22/11/2018
> 16:17:11:
> > >
> > > > From: Stanislav Kozlovski 
> > > > To: dev@kafka.apache.org
> > > > Date: 22/11/2018 16:17
> > > > Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for
> > > > Cluster Replication
> > > >
> > > > Hey Edurdo, thanks for the KIP!
> > > >
> > > > I have some questions, apologies if they are naive:
> > > > Is this intended to work for a single producer use case only?
> > > > How would it 

[jira] [Created] (KAFKA-7673) Upgrade RocksDB to include fix for WinEnvIO::GetSectorSize

2018-11-26 Thread Yanick Salzmann (JIRA)
Yanick Salzmann created KAFKA-7673:
--

 Summary: Upgrade RocksDB to include fix for WinEnvIO::GetSectorSize
 Key: KAFKA-7673
 URL: https://issues.apache.org/jira/browse/KAFKA-7673
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.0
Reporter: Yanick Salzmann


The following fix would help making it possible to work with kafka streams in 
Windows 7 (right now it is not possible to start an application using Kafka 
Streams):

[https://github.com/facebook/rocksdb/commit/9c7da963bc8b3df8f3ed3865f00dd7c483267ac0]

According to the tags it would require an upgrade to one of the below versions:
 * [v5.17.2|https://github.com/facebook/rocksdb/releases/tag/v5.17.2]

 * [v5.16.6|https://github.com/facebook/rocksdb/releases/tag/v5.16.6]

 * [v5.15.10|https://github.com/facebook/rocksdb/releases/tag/v5.15.10]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-26 Thread Boyang Chen
Hey Dong, thanks for the follow-up here!


1) It is not very clear to the user what is the difference between
member.name and client.id as both seems to be used to identify the
consumer. I am wondering if it would be more intuitive to name it
group.member.name (preferred choice since it matches the current group.id
config name) or rebalance.member.name to explicitly show that the id is
solely used for rebalance.
Great question. I feel `member.name` is enough to explain itself, it seems not 
very
helpful to make the config name longer. Comparing `name` with `id` gives user 
the
impression that they have the control over it with customized rule than library 
decided.

2) In the interface change section it is said that GroupMaxSessionTimeoutMs
will be changed to 30 minutes. It seems to suggest that we will change the
default value of this config. It does not seem necessary to increase the
time of consumer failure detection when user doesn't use static membership.
Also, say static membership is enabled, then this default config change
will cause a partition to be unavailable for consumption for 30 minutes if
there is hard consumer failure, which seems to be worse experience than
having unnecessary rebalance (when this timeout is small), particularly for
new users of Kafka. Could you explain more why we should make this change?
We are not changing the default session timeout value. We are just changing the
cap we are enforcing on the session timeout max value. So this change is not 
affecting
what kind of membership end user is using, and loosing the cap is giving end 
user
more flexibility on trade-off between liveness and stability.

3) Could we just combine MEMBER_ID_MISMATCH and DUPLICATE_STATIC_MEMBER
into one error? It seems that these two errors are currently handled by the
consumer in the same way. And we don't also don't expect MEMBER_ID_MISMATCH
to happen. Thus it is not clear what is the benefit of having two errors.
I agree that we should remove DUPLICATE_STATIC_MEMBER error because with the 
KIP-394
we will automatically fence all join requests with UNKNOWN_MEMBER_ID.

4) The doc for DUPLICATE_STATIC_MEMBER says that "The join group contains
member name which is already in the consumer group, however the member id
was missing". After a consumer is restarted, it will send a
JoinGroupRequest with an existing memberName (as the coordinator has not
expired this member from the memory) and memberId
= JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not persisted
across consumer restart in the consumer side). Does it mean that
JoinGroupRequest from a newly restarted consumer will always be rejected
until the sessionTimeoutMs has passed?
Same answer as question 3). This part of the logic shall be removed from the 
proposal.

5) It seems that we always add two methods to the interface
org.apache.kafka.clients.admin.AdminClient.java, one with options and the
other without option. Could this be specified in the interface change
section?
Sounds good! Added both methods.

6) Do we plan to have off-the-shelf command line tool for SRE to trigger
rebalance? If so, we probably want to specify the command line tool
interface similar to
https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scriptsdata=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435%7C1%7C0%7C636786393153281080sdata=%2BNasMFlJf9rEJc9iDfndcyxA4%2BGWieS1azSKbtdGRW4%3Dreserved=0
.
Added the script.

7) Would it be simpler to replace name "forceStaticRebalance" with
"invokeConsumerRebalance"? It is not very clear what is the extra meaning
of world "force" as compared to "trigger" or "invoke". And it seems simpler
to allows this API to trigger rebalance regardless of whether consumer is
configured with memberName.
Sounds good. Right now I feel for both static and dynamic membership it is
more manageable to introduce the consumer rebalance method through admin
client API.

8) It is not very clear how the newly added AdminClient API trigger
rebalance. For example, does it send request? Can this be explained in the
KIP?

Sure, I will add more details to the API.


Thanks again for the helpful suggestions!


Best,
Boyang


From: Dong Lin 
Sent: Saturday, November 24, 2018 2:54 PM
To: dev
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hey Boyang,

Thanks for the update! Here are some followup comments:

1) It is not very clear to the user what is the difference between
member.name and client.id as both seems to be used to identify the
consumer. I am wondering if it would be more intuitive to name it
group.member.name (preferred choice since it 

Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata growth

2018-11-26 Thread Boyang Chen
Thanks Stanislav for the update! One suggestion I have is that it would be 
helpful to put your

reasoning on deciding the current default value. For example, in certain use 
cases at Pinterest we are very likely

to have more consumers than 250 when we configure 8 stream instances with 32 
threads.


For the effectiveness of this KIP, we should encourage people to discuss their 
opinions on the default setting and ideally reach a consensus.


Best,

Boyang


From: Stanislav Kozlovski 
Sent: Monday, November 26, 2018 6:14 PM
To: dev@kafka.apache.org
Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata 
growth

Hey everybody,

It's been a week since this KIP and not much discussion has been made.
I assume that this is a straight forward change and I will open a voting
thread in the next couple of days if nobody has anything to suggest.

Best,
Stanislav

On Thu, Nov 22, 2018 at 12:56 PM Stanislav Kozlovski 
wrote:

> Greetings everybody,
>
> I have enriched the KIP a bit with a bigger Motivation section and also
> renamed it.
> KIP:
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-389%253A%2BIntroduce%2Ba%2Bconfigurable%2Bconsumer%2Bgroup%2Bsize%2Blimitdata=02%7C01%7C%7C085ed04564f2472e50f308d65387f4fd%7C84df9e7fe9f640afb435%7C1%7C0%7C636788240721218938sdata=C6aXV4T6JWcNPtJhVSNxPrHSm2oTP%2BtGN4XvD4jSUOU%3Dreserved=0
>
> I'm looking forward to discussions around it.
>
> Best,
> Stanislav
>
> On Tue, Nov 20, 2018 at 1:47 PM Stanislav Kozlovski <
> stanis...@confluent.io> wrote:
>
>> Hey there everybody,
>>
>> Thanks for the introduction Boyang. I appreciate the effort you are
>> putting into improving consumer behavior in Kafka.
>>
>> @Matt
>> I also believe the default value is high. In my opinion, we should aim to
>> a default cap around 250. This is because in the current model any consumer
>> rebalance is disrupting to every consumer. The bigger the group, the longer
>> this period of disruption.
>>
>> If you have such a large consumer group, chances are that your
>> client-side logic could be structured better and that you are not using the
>> high number of consumers to achieve high throughput.
>> 250 can still be considered of a high upper bound, I believe in practice
>> users should aim to not go over 100 consumers per consumer group.
>>
>> In regards to the cap being global/per-broker, I think that we should
>> consider whether we want it to be global or *per-topic*. For the time
>> being, I believe that having it per-topic with a global default might be
>> the best situation. Having it global only seems a bit restricting to me and
>> it never hurts to support more fine-grained configurability (given it's the
>> same config, not a new one being introduced).
>>
>> On Tue, Nov 20, 2018 at 11:32 AM Boyang Chen  wrote:
>>
>>> Thanks Matt for the suggestion! I'm still open to any suggestion to
>>> change the default value. Meanwhile I just want to point out that this
>>> value is a just last line of defense, not a real scenario we would expect.
>>>
>>>
>>> In the meanwhile, I discussed with Stanislav and he would be driving the
>>> 389 effort from now on. Stanislav proposed the idea in the first place and
>>> had already come up a draft design, while I will keep focusing on KIP-345
>>> effort to ensure solving the edge case described in the JIRA<
>>> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-7610data=02%7C01%7C%7C085ed04564f2472e50f308d65387f4fd%7C84df9e7fe9f640afb435%7C1%7C0%7C636788240721218938sdata=PyOSGb6FhjcIS0XL2vcv2YEUSaYk9lL593ioHS4rRHk%3Dreserved=0>.
>>>
>>>
>>> Thank you Stanislav for making this happen!
>>>
>>>
>>> Boyang
>>>
>>> 
>>> From: Matt Farmer 
>>> Sent: Tuesday, November 20, 2018 10:24 AM
>>> To: dev@kafka.apache.org
>>> Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
>>> metadata growth
>>>
>>> Thanks for the KIP.
>>>
>>> Will this cap be a global cap across the entire cluster or per broker?
>>>
>>> Either way the default value seems a bit high to me, but that could just
>>> be
>>> from my own usage patterns. I’d have probably started with 500 or 1k but
>>> could be easily convinced that’s wrong.
>>>
>>> Thanks,
>>> Matt
>>>
>>> On Mon, Nov 19, 2018 at 8:51 PM Boyang Chen  wrote:
>>>
>>> > Hey folks,
>>> >
>>> >
>>> > I would like to start a discussion on KIP-389:
>>> >
>>> >
>>> >
>>> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-389%253A%2BEnforce%2Bgroup.max.size%2Bto%2Bcap%2Bmember%2Bmetadata%2Bgrowthdata=02%7C01%7C%7C085ed04564f2472e50f308d65387f4fd%7C84df9e7fe9f640afb435%7C1%7C0%7C636788240721218938sdata=DXlRY6ydvXSjMU0CaTvoEj65DOC4d0p02hzu6IdGyk8%3Dreserved=0
>>> >
>>> >
>>> > This is a pretty simple change to cap the 

Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata growth

2018-11-26 Thread Stanislav Kozlovski
Hey everybody,

It's been a week since this KIP and not much discussion has been made.
I assume that this is a straight forward change and I will open a voting
thread in the next couple of days if nobody has anything to suggest.

Best,
Stanislav

On Thu, Nov 22, 2018 at 12:56 PM Stanislav Kozlovski 
wrote:

> Greetings everybody,
>
> I have enriched the KIP a bit with a bigger Motivation section and also
> renamed it.
> KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit
>
> I'm looking forward to discussions around it.
>
> Best,
> Stanislav
>
> On Tue, Nov 20, 2018 at 1:47 PM Stanislav Kozlovski <
> stanis...@confluent.io> wrote:
>
>> Hey there everybody,
>>
>> Thanks for the introduction Boyang. I appreciate the effort you are
>> putting into improving consumer behavior in Kafka.
>>
>> @Matt
>> I also believe the default value is high. In my opinion, we should aim to
>> a default cap around 250. This is because in the current model any consumer
>> rebalance is disrupting to every consumer. The bigger the group, the longer
>> this period of disruption.
>>
>> If you have such a large consumer group, chances are that your
>> client-side logic could be structured better and that you are not using the
>> high number of consumers to achieve high throughput.
>> 250 can still be considered of a high upper bound, I believe in practice
>> users should aim to not go over 100 consumers per consumer group.
>>
>> In regards to the cap being global/per-broker, I think that we should
>> consider whether we want it to be global or *per-topic*. For the time
>> being, I believe that having it per-topic with a global default might be
>> the best situation. Having it global only seems a bit restricting to me and
>> it never hurts to support more fine-grained configurability (given it's the
>> same config, not a new one being introduced).
>>
>> On Tue, Nov 20, 2018 at 11:32 AM Boyang Chen  wrote:
>>
>>> Thanks Matt for the suggestion! I'm still open to any suggestion to
>>> change the default value. Meanwhile I just want to point out that this
>>> value is a just last line of defense, not a real scenario we would expect.
>>>
>>>
>>> In the meanwhile, I discussed with Stanislav and he would be driving the
>>> 389 effort from now on. Stanislav proposed the idea in the first place and
>>> had already come up a draft design, while I will keep focusing on KIP-345
>>> effort to ensure solving the edge case described in the JIRA<
>>> https://issues.apache.org/jira/browse/KAFKA-7610>.
>>>
>>>
>>> Thank you Stanislav for making this happen!
>>>
>>>
>>> Boyang
>>>
>>> 
>>> From: Matt Farmer 
>>> Sent: Tuesday, November 20, 2018 10:24 AM
>>> To: dev@kafka.apache.org
>>> Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
>>> metadata growth
>>>
>>> Thanks for the KIP.
>>>
>>> Will this cap be a global cap across the entire cluster or per broker?
>>>
>>> Either way the default value seems a bit high to me, but that could just
>>> be
>>> from my own usage patterns. I’d have probably started with 500 or 1k but
>>> could be easily convinced that’s wrong.
>>>
>>> Thanks,
>>> Matt
>>>
>>> On Mon, Nov 19, 2018 at 8:51 PM Boyang Chen  wrote:
>>>
>>> > Hey folks,
>>> >
>>> >
>>> > I would like to start a discussion on KIP-389:
>>> >
>>> >
>>> >
>>> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-389%253A%2BEnforce%2Bgroup.max.size%2Bto%2Bcap%2Bmember%2Bmetadata%2Bgrowthdata=02%7C01%7C%7Cb0ee4fe97ad44cc046eb08d64e8f5d90%7C84df9e7fe9f640afb435%7C1%7C0%7C636782774981237462sdata=Q2T7hIoVq8GiPVhr0HIxVkGNChkiz1Pvk2zyLD5gCu8%3Dreserved=0
>>> >
>>> >
>>> > This is a pretty simple change to cap the consumer group size for
>>> broker
>>> > stability. Give me your valuable feedback when you got time.
>>> >
>>> >
>>> > Thank you!
>>> >
>>>
>>
>>
>> --
>> Best,
>> Stanislav
>>
>
>
> --
> Best,
> Stanislav
>


-- 
Best,
Stanislav