Re: [VOTE] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch

2017-08-10 Thread Manikumar
+1 (non-binding)

On Fri, Aug 11, 2017 at 5:31 AM, Jun Rao  wrote:

> Hi, Vahid,
>
> Thanks for the KIP. +1 from me.
>
> Jun
>
> On Wed, Aug 2, 2017 at 2:40 PM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com
> > wrote:
>
> > Hi all,
> >
> > Thanks to everyone who participated in the discussion on KIP-163, and
> > provided feedback.
> > The KIP can be found at
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 163%3A+Lower+the+Minimum+Required+ACL+Permission+of+OffsetFetch
> > .
> > I believe the concerns have been addressed in the current version of the
> > KIP; so I'd like to start a vote.
> >
> > Thanks.
> > --Vahid
> >
> >
> >
>


Re: [DISCUSS] KIP-185: Make exactly once in order delivery per partition the default producer setting

2017-08-10 Thread Apurva Mehta
Hi Dong,

Thanks for your comments.

Yes, with retries=MAX_INT, producer.flush() may block. I think there are
two solutions: a good one would be to adopt some form of KIP-91 to bound
the time a message can remain unacknowledged. Alternately, we could set the
default retries to 10 or something. I prefer implementing KIP-91 along with
this KIP to solve this problem, but it isn't a strong dependency.

Yes, OutOfOrderSequence is a new exception. It indicates a previously
acknowledged message was lost. This could happen even today, but there is
no way for the client to detect it. With KIP-98 and the new sequence
numbers, we can. If applications ignore it, they would have the same
behavior as the already have, except with the explicit knowledge that
something has been lost.

Finally, from my perspective, the best the reason to make acks=all the
default is that it would be a coherent default to have. Along with enabling
idempotence, acks=all, and retries=MAX_INT would mean that acknowledged
messages would appear in the log exactly once. The 'fatal' exceptions would
be either AuthorizationExceptions, ConfigExceptions, or rare data loss
issues due to concurrent failures or software bugs. So while this is not a
guarantee of exactly once, it is practically as close to it as you can get.
I think this is a strong enough reason to enable acks=all.

Thanks,
Apurva


On Thu, Aug 10, 2017 at 1:04 AM, Dong Lin  wrote:

> Hey Apurva,
>
> Thanks for the KIP. I have read through the KIP and the prior discussion in
> this thread. I have three concerns that are related to Becket's comments:
>
> - Is it true that, as Becket has mentioned, producer.flush() may block
> infinitely if retries=MAX_INT? This seems like a possible reason to break
> user's application. I think we probably should avoid causing correctness
> penalty for application.
>
> - It seems that OutOfOrderSequenceException will be a new exception thrown
> to user after this config change. Can you clarify whether this will cause
> correctness penalty for application?
>
> - It is not very clear to me whether the benefit of increasing acks from 1
> to all is worth the performance hit. For users who have not already
> overridden acks to all, it is very likely that they are not already doing
> other complicated work (e.g. close producer in callback) that are necessary
> for exactly-once delivery. Thus those users won't have exactly-once
> semantics by simply picking up the change in the default acks
> configuration. It seems that the only benefit of this config change is the
> well-known tradeoff between performance and message loss rate. I am not
> sure this is a strong reason to risk reducing existing user's performance.
>
> I think my point is that we should not to make change that will break
> user's existing application. And we should try to avoid reducing user's
> performance unless there is strong benefit of doing so (e.g. exactly-once).
>
> Thanks,
> Dong
>
>
>
>
> On Wed, Aug 9, 2017 at 10:43 PM, Apurva Mehta  wrote:
>
> > Thanks for your email Becket.
> >
> > Your observations around using acks=1 and acks=-1 are correct. Do note
> that
> > getting an OutOfOrderSequence means that acknowledged data has been lost.
> > This could be due to a weaker acks setting like acks=1 or due to a topic
> > which is not configured to handle broker failures cleanly (unclean leader
> > election is enabled, etc.). Either way, you are right in observing that
> if
> > an app is very serious about having exactly one copy of each ack'd
> message
> > in the log, it is a significant effort to recover from this error.
> >
> > However, I propose an alternate way of thinking about this: is it
> > worthwhile shipping Kafka with the defaults tuned for strong semantics?
> > That is essentially what is being proposed here, and of course there will
> > be tradeoffs with performance and deployment costs-- you can't have your
> > cake and eat it too.
> >
> > And if we want to ship Kafka with strong semantics by default, we might
> > want to make the default topic level settings as well as the client
> > settings more robust. This means, for instance, disabling unclean leader
> > election by default. If there are other configs we need to change on the
> > broker side to ensure that ack'd messages are not lost due to transient
> > failures, we should change those as well as part of a future KIP.
> >
> > Personally, I think that the defaults should provide robust guarantees.
> >
> > And this brings me to another point: these are just proposed defaults.
> > Nothing is being taken away in terms of flexibility to tune for different
> > behavior.
> >
> > Finally, the way idempotence is implemented means that there needs to be
> > some cap on max.in.flight when idempotence is enabled -- that is just a
> > tradeoff of the feature. Do we have any data that there are installations
> > which benefit greatly for a value of max.in.flight > 5? For instance,
> > LinkedIn probably has the largest and most demand

[jira] [Created] (KAFKA-5724) AbstractPartitionAssignor does not take into consideration that partition number may start from non-zero

2017-08-10 Thread Allen Wang (JIRA)
Allen Wang created KAFKA-5724:
-

 Summary: AbstractPartitionAssignor does not take into 
consideration that partition number may start from non-zero
 Key: KAFKA-5724
 URL: https://issues.apache.org/jira/browse/KAFKA-5724
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 0.11.0.0
Reporter: Allen Wang


In AbstractPartitionAssignor.assign(Cluster metadata, Map 
subscriptions), it invokes assign(partitionsPerTopic, subscriptions). It 
assumes that partition number starts from 0, and it constructs TopicPartition 
in the range of [0, partitionsPerTopic). 

This assumption is not correct. The correct way to handle it is to follow the 
same approach in producer's DefaultPartitioner, where it uses [0, 
numberOfPartition) as an index to the actual partition.

There are use cases where partition number may not start from zero. It can 
happen if users use advanced tooling to manually craft the partition number 
when creating topics.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-183 - Change PreferredReplicaLeaderElectionCommand to use AdminClient

2017-08-10 Thread Jun Rao
Hi, Tom,

Thanks for the KIP. Looks good overall. A few minor comments.

1. In most requests with topic partitions, we nest partitions inside topic.
So, instead of [topic partition_id], we do [topic, [partition_id]] to save
some space.

2. The preferred leader election just tries to move the leader to the
preferred replica. This may not succeed if the preferred replica is not
in-sync yet. It would be useful to return an error code to reflect that.

3. REPLICA_LEADER_ELECTION_IN_PROGRESS: Now that the controller is single
threaded, is that just for timeout? If so, perhaps we can just return the
Timeout error code.

4. Permission wise, it seems this should require ClusterAction on Cluster
resource since it's changing things at the cluster level.

Thanks,

Jun



On Wed, Aug 9, 2017 at 6:16 AM, Ismael Juma  wrote:

> On Wed, Aug 9, 2017 at 11:40 AM, Tom Bentley 
> wrote:
> >
> > There are responses with detailed error messages as well as the codes,
> > CreateTopicsResponse, {Describe|Alter}ConfigsResponse, and the responses
> > for managing ACLs for instance. To be honest, I assumed including a
> message
> > was the norm. In this case, however, I don't think there is any extra
> > detail to include beyond the error itself, so I've removed it from the
> KIP.
> >
>
> We started sending back actual error messages when we introduced the Create
> Topic Policy as the errors are custom and much more helpful if a string can
> be sent back.
>
> In general, I think it would be better if we had an optional error message
> for all request types as error codes alone sometimes result in people
> having to check the broker logs. Examples that come to mind are Group
> Coordinator Not Available and Invalid Request. For the latter, we want to
> include what was invalid and it's not practical to have an error code for
> every possible validation error (this becomes more obvious once we start
> adding admin protocol APIs).
>
> Ismael
>


Re: [DISCUSS] KIP-186: Increase offsets retention default to 7 days

2017-08-10 Thread Guozhang Wang
+1 from me

On Wed, Aug 9, 2017 at 9:40 AM, Jason Gustafson  wrote:

> +1 on the bump to 7 days. Wanted to mention one minor point. The
> OffsetCommit RPC still provides the ability to set the retention time from
> the client, but we do not use it in the consumer. Should we consider adding
> a consumer config to set this? Given the problems people had with the old
> default, such a config would probably have gotten a fair bit of use. Maybe
> it's less necessary with the new default, but there may be situations where
> you don't want to keep the offsets for too long. For example, the console
> consumer commits offsets with a generated group id. We might want to set a
> low retention time to keep it from filling the offset cache with garbage
> from such groups.
>

I agree with Jason here, but maybe itself deserves a separate KIP
discussion.


>
> -Jason
>
> On Wed, Aug 9, 2017 at 5:24 AM, Sönke Liebau <
> soenke.lie...@opencore.com.invalid> wrote:
>
> > Just had this create issues at a customer as well, +1
> >
> > On Wed, Aug 9, 2017 at 11:46 AM, Mickael Maison <
> mickael.mai...@gmail.com>
> > wrote:
> >
> > > Yes the current default is too short, +1
> > >
> > > On Wed, Aug 9, 2017 at 8:56 AM, Ismael Juma  wrote:
> > > > Thanks for the KIP, +1 from me.
> > > >
> > > > Ismael
> > > >
> > > > On Wed, Aug 9, 2017 at 1:24 AM, Ewen Cheslack-Postava <
> > e...@confluent.io
> > > >
> > > > wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> I posted a simple new KIP for a problem we see with a lot of users:
> > > >> KIP-186: Increase offsets retention default to 7 days
> > > >>
> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> 186%3A+Increase+offsets+retention+default+to+7+days
> > > >>
> > > >> Note that in addition to the KIP text itself, the linked JIRA
> already
> > > >> existed and has a bunch of discussion on the subject.
> > > >>
> > > >> -Ewen
> > > >>
> > >
> >
> >
> >
> > --
> > Sönke Liebau
> > Partner
> > Tel. +49 179 7940878
> > OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
> >
>



-- 
-- Guozhang


Re: [VOTE] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch

2017-08-10 Thread Jun Rao
Hi, Vahid,

Thanks for the KIP. +1 from me.

Jun

On Wed, Aug 2, 2017 at 2:40 PM, Vahid S Hashemian  wrote:

> Hi all,
>
> Thanks to everyone who participated in the discussion on KIP-163, and
> provided feedback.
> The KIP can be found at
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 163%3A+Lower+the+Minimum+Required+ACL+Permission+of+OffsetFetch
> .
> I believe the concerns have been addressed in the current version of the
> KIP; so I'd like to start a vote.
>
> Thanks.
> --Vahid
>
>
>


Re: [DISCUSS] KIP-186: Increase offsets retention default to 7 days

2017-08-10 Thread James Cheng
+1 from me!

-James

> On Aug 8, 2017, at 5:24 PM, Ewen Cheslack-Postava  wrote:
> 
> Hi all,
> 
> I posted a simple new KIP for a problem we see with a lot of users:
> KIP-186: Increase offsets retention default to 7 days
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days
> 
> Note that in addition to the KIP text itself, the linked JIRA already
> existed and has a bunch of discussion on the subject.
> 
> -Ewen



Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-08-10 Thread Jun Rao
Hi, Sumant,

Thanks for the KIP. Nice documentation on all current issues with the
timeout.

You also brought up a good use case for timing out a message. For
applications that collect and send sensor data to Kafka, if the data can't
be sent to Kafka for some reason, the application may prefer to buffer the
more recent data in the accumulator. Without a timeout, the accumulator
will be filled with old records and new records can't be added.

Your proposal makes sense for a developer who is familiar with how the
producer works. I am not sure if this is very intuitive to the users since
it may not be very easy for them to figure out how to configure the new
knob to bound the amount of the time when a message is completed.

>From users' perspective, Apurva's suggestion of
max.message.delivery.wait.ms (which
bounds the time when a message is in the accumulator to the time when the
callback is called) seems more intuition. You listed this in the rejected
section since it requires additional logic to rebatch when a produce
request expires. However, this may not be too bad. The following are the
things that we have to do.

1. The clock starts when a batch is created.
2. If the batch can't be drained within max.message.delivery.wait.ms, all
messages in the batch will fail and the callback will be called.
3. When sending a produce request, we calculate an expireTime for the
request that equals to the remaining expiration time for the oldest batch
in the request.
4. We set the minimum of the expireTime of all inflight requests as the
timeout in the selector poll call (so that the selector can wake up before
the expiration time).
5. If the produce response can't be received within expireTime, we expire
all batches in the produce request whose expiration time has been reached.
For the rest of the batches, we resend them in a new produce request.
6. If the producer response has a retriable error, we just backoff a bit
and then retry the produce request as today. The number of retries doesn't
really matter now. We just keep retrying until the expiration time is
reached. It's possible that a produce request is never retried due to
expiration. However, this seems the right thing to do since the users want
to timeout the message at this time.

Implementation wise, there will be a bit more complexity in step 3 and 4,
but probably not too bad. The benefit is that this is more intuitive to the
end user.

Does that sound reasonable to you?

Thanks,

Jun


On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe  wrote:

> On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta  wrote:
>
> > > > There seems to be no relationship with cluster metadata availability
> or
> > > > staleness. Expiry is just based on the time since the batch has been
> > > ready.
> > > > Please correct me if I am wrong.
> > > >
> > >
> > > I was not very specific about where we do expiration. I glossed over
> some
> > > details because (again) we've other mechanisms to detect non progress.
> > The
> > > condition (!muted.contains(tp) && (isMetadataStale ||
> > > > cluster.leaderFor(tp) == null)) is used in
> > > RecordAccumualtor.expiredBatches:
> > > https://github.com/apache/kafka/blob/trunk/clients/src/
> > > main/java/org/apache/kafka/clients/producer/internals/
> > > RecordAccumulator.java#L443
> > >
> > >
> > > Effectively, we expire in all the following cases
> > > 1) producer is partitioned from the brokers. When metadata age grows
> > beyond
> > > 3x it's max value. It's safe to say that we're not talking to the
> brokers
> > > at all. Report.
> > > 2) fresh metadata && leader for a partition is not known && a batch is
> > > sitting there for longer than request.timeout.ms. This is one case we
> > > would
> > > like to improve and use batch.expiry.ms because request.timeout.ms is
> > too
> > > small.
> > > 3) fresh metadata && leader for a partition is known && batch is
> sitting
> > > there for longer than batch.expiry.ms. This is a new case that is
> > > different
> > > from #2. This is the catch-up mode case. Things are moving too slowly.
> > > Pipeline SLAs are broken. Report and shutdown kmm.
> > >
> > > The second and the third cases are useful to a real-time app for a
> > > completely different reason. Report, forget about the batch, and just
> > move
> > > on (without shutting down).
> > >
> > >
> > If I understand correctly, you are talking about a fork of apache kafka
> > which has these additional conditions? Because that check doesn't exist
> on
> > trunk today.
>
> Right. It is our internal release in LinkedIn.
>
> Or are you proposing to change the behavior of expiry to
> > account for stale metadata and partitioned producers as part of this KIP?
>
>
> No. It's our temporary solution in the absence of kip-91. Note that we dont
> like increasing request.timeout.ms. Without our extra conditions our
> batches expire too soon--a problem in kmm catchup mode.
>
> If we get batch.expiry.ms, we will configure it to 20 mins. maybeExpire
> will use the config i

Re: [DISCUSS] KIP-186: Increase offsets retention default to 7 days

2017-08-10 Thread Vahid S Hashemian
+1 on both the KIP and Jason's suggestion of adding the consumer config.

Thanks.
--Vahid



From:   Jason Gustafson 
To: dev@kafka.apache.org
Date:   08/09/2017 09:40 AM
Subject:Re: [DISCUSS] KIP-186: Increase offsets retention default 
to 7 days



+1 on the bump to 7 days. Wanted to mention one minor point. The
OffsetCommit RPC still provides the ability to set the retention time from
the client, but we do not use it in the consumer. Should we consider 
adding
a consumer config to set this? Given the problems people had with the old
default, such a config would probably have gotten a fair bit of use. Maybe
it's less necessary with the new default, but there may be situations 
where
you don't want to keep the offsets for too long. For example, the console
consumer commits offsets with a generated group id. We might want to set a
low retention time to keep it from filling the offset cache with garbage
from such groups.

-Jason

On Wed, Aug 9, 2017 at 5:24 AM, Sönke Liebau <
soenke.lie...@opencore.com.invalid> wrote:

> Just had this create issues at a customer as well, +1
>
> On Wed, Aug 9, 2017 at 11:46 AM, Mickael Maison 

> wrote:
>
> > Yes the current default is too short, +1
> >
> > On Wed, Aug 9, 2017 at 8:56 AM, Ismael Juma  wrote:
> > > Thanks for the KIP, +1 from me.
> > >
> > > Ismael
> > >
> > > On Wed, Aug 9, 2017 at 1:24 AM, Ewen Cheslack-Postava <
> e...@confluent.io
> > >
> > > wrote:
> > >
> > >> Hi all,
> > >>
> > >> I posted a simple new KIP for a problem we see with a lot of users:
> > >> KIP-186: Increase offsets retention default to 7 days
> > >>
> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> 186%3A+Increase+offsets+retention+default+to+7+days
> > >>
> > >> Note that in addition to the KIP text itself, the linked JIRA 
already
> > >> existed and has a bunch of discussion on the subject.
> > >>
> > >> -Ewen
> > >>
> >
>
>
>
> --
> Sönke Liebau
> Partner
> Tel. +49 179 7940878
> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>






Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-10 Thread Dong Lin
Hey Jun,

This is a very good idea. I have updated the KIP-113 so that
DescribeDirResponse returns lag instead of LEO. If the replica is not a
temporary replica, then lag = max(0, HW - LEO). Otherwise, lag = primary
Replica's LEO - temporary Replica's LEO.

Thanks!
Dong

On Thu, Aug 10, 2017 at 10:21 AM, Jun Rao  wrote:

> Hi, Tom, Dong,
>
> A couple of comments on that.
>
> 1. I think we can unify the reporting of lags. Basically, the lag will be
> reported on every replica (temporary or permanent), not just at the leader
> replica. If it's permanent, lag is max(0, HW - LEO) as it is now.
> Otherwise, lag is (LEO of permanent replica - LEO of temporary replica).
> That way, it seems that we can use a single request to monitor the progress
> of both inter and intra replica movement and it would be more accurate than
> relying on LEO directly.
>
> 2. Tom, note that currently, the LeaderAndIsrRequest doesn't specify the
> log dir. So, I am not sure in your new proposal, how the log dir info is
> communicated to all brokers. Is the broker receiving the
> ReassignPartitionsRequest
> going to forward that to all brokers?
>
> Thanks,
>
> Jun
>
>
>
> On Thu, Aug 10, 2017 at 6:57 AM, Tom Bentley 
> wrote:
>
> > I've spent some time thinking about KIP-179 and KIP-113, the proposed
> > algorithms and APIs, and trying to weigh the pros and cons of various
> > alternative options.
> >
> > I think Dong's reasons for the algorithm for inter-broker move in KIP-113
> > make a lot of sense. I don't think it would be at all simple to try to
> > change that algorithm to one where the whole thing can be triggered by a
> > single call to an AdminClient method. So I guess we should try to keep as
> > much of that algorithm as possible.
> >
> > KIP-179 will need to change this step
> >
> >  - The script creates reassignment znode in zookeeper.
> > >
> >
> > with an AdminClient API call. This call can the same one as currently
> > specified in KIP-179 -- reassignPartitions() -- except the argument needs
> > to take into account the need to pass log dirs as well as broker ids.
> Thus
> > I would suggest
> >
> > ReassignPartitionsResult reassignPartitions(Map > List> assignments)
> >
> > where:
> >
> > class ReplicAssignment {
> > int broker()
> > String logDirectory()// can be null
> > }
> >
> > (This is just a Java representation of the reassignment json in KIP-113,
> > which itself is a superset of the reassignment json currently in use)
> >
> > The corresponding protocol would look like this:
> >
> > ReassignPartitionsRequest => timeout validate_only log_dirs
> > [topic_assignment]
> >   timout => int32
> >   validate_only => boolean
> >   log_dirs => [string]
> >   topic_assignment => topic [partition_assignment]
> > topic => string
> > partition_assignment => partition [replica]
> >   partition => int32
> >   replica => broker log_dir_index
> > broker => int32
> > log_dir_index => int16
> >
> > The purpose of log_dirs is to serialize each log dir in the request only
> > once. These are then referred to by index in log_dir_index. The
> > log_dir_index can be -1, which means the caller doesn't care which log
> > directory should be used on the receiving broker.
> >
> > This request can be sent to *any* broker. The broker which receives a
> > ReassignPartitionsRequest essentially converts it into reassignment JSON
> > and writes that JSON to the znode, then returns a
> > ReassignPartitionsResponse:
> >
> > ReassignPartitionsResponse => throttle_time_ms
> > [topic_assignment_result]
> >   throttle_time_ms => INT32
> >   log_dirs => [string]
> >   topic_assignment_result => topic partition_assignment_result
> > topic => STRING
> > partition_assignment_result => partition [replica_result]
> >   partition => int32
> >   replica_result => broker log_dir_index error_code
> > error_message
> > broker => int32
> > log_dir_index => int16
> > error_code => INT16
> >  error_message => NULLABLE_STRING
> >
> > This is using the same encoding scheme as wrt log_dirs as described
> above.
> >
> > Meanwhile the controller is notified by ZK of the change in value of the
> > znode and proceeds, as currently, by sending LeaderAndIsrRequest and
> > StopReplicaRequest in order to complete the reassignments.
> >
> > The remaining problem is around how to measure progress of reassignment.
> As
> > mentioned in the email I wrote this morning, I think we really need two
> > different lag calculations if we're using the lag to measure progress and
> > we want the property that lag=0 means reassignment has finished. The
> > problem with that, I now realise, is the script might be called with
> > reassignments which are a mix of:
> >
> > * inter-broker moves without a log dir (=> use HW-replicaLEO)
> > * inter-broker moves with a

[GitHub] kafka pull request #3644: KAFKA-5711: batch restore should handle deletes

2017-08-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3644


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #3646: MINOR: Remove unneeded error handlers in deprecate...

2017-08-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3646


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-10 Thread Jun Rao
Hi, Tom, Dong,

A couple of comments on that.

1. I think we can unify the reporting of lags. Basically, the lag will be
reported on every replica (temporary or permanent), not just at the leader
replica. If it's permanent, lag is max(0, HW - LEO) as it is now.
Otherwise, lag is (LEO of permanent replica - LEO of temporary replica).
That way, it seems that we can use a single request to monitor the progress
of both inter and intra replica movement and it would be more accurate than
relying on LEO directly.

2. Tom, note that currently, the LeaderAndIsrRequest doesn't specify the
log dir. So, I am not sure in your new proposal, how the log dir info is
communicated to all brokers. Is the broker receiving the
ReassignPartitionsRequest
going to forward that to all brokers?

Thanks,

Jun



On Thu, Aug 10, 2017 at 6:57 AM, Tom Bentley  wrote:

> I've spent some time thinking about KIP-179 and KIP-113, the proposed
> algorithms and APIs, and trying to weigh the pros and cons of various
> alternative options.
>
> I think Dong's reasons for the algorithm for inter-broker move in KIP-113
> make a lot of sense. I don't think it would be at all simple to try to
> change that algorithm to one where the whole thing can be triggered by a
> single call to an AdminClient method. So I guess we should try to keep as
> much of that algorithm as possible.
>
> KIP-179 will need to change this step
>
>  - The script creates reassignment znode in zookeeper.
> >
>
> with an AdminClient API call. This call can the same one as currently
> specified in KIP-179 -- reassignPartitions() -- except the argument needs
> to take into account the need to pass log dirs as well as broker ids. Thus
> I would suggest
>
> ReassignPartitionsResult reassignPartitions(Map List> assignments)
>
> where:
>
> class ReplicAssignment {
> int broker()
> String logDirectory()// can be null
> }
>
> (This is just a Java representation of the reassignment json in KIP-113,
> which itself is a superset of the reassignment json currently in use)
>
> The corresponding protocol would look like this:
>
> ReassignPartitionsRequest => timeout validate_only log_dirs
> [topic_assignment]
>   timout => int32
>   validate_only => boolean
>   log_dirs => [string]
>   topic_assignment => topic [partition_assignment]
> topic => string
> partition_assignment => partition [replica]
>   partition => int32
>   replica => broker log_dir_index
> broker => int32
> log_dir_index => int16
>
> The purpose of log_dirs is to serialize each log dir in the request only
> once. These are then referred to by index in log_dir_index. The
> log_dir_index can be -1, which means the caller doesn't care which log
> directory should be used on the receiving broker.
>
> This request can be sent to *any* broker. The broker which receives a
> ReassignPartitionsRequest essentially converts it into reassignment JSON
> and writes that JSON to the znode, then returns a
> ReassignPartitionsResponse:
>
> ReassignPartitionsResponse => throttle_time_ms
> [topic_assignment_result]
>   throttle_time_ms => INT32
>   log_dirs => [string]
>   topic_assignment_result => topic partition_assignment_result
> topic => STRING
> partition_assignment_result => partition [replica_result]
>   partition => int32
>   replica_result => broker log_dir_index error_code
> error_message
> broker => int32
> log_dir_index => int16
> error_code => INT16
>  error_message => NULLABLE_STRING
>
> This is using the same encoding scheme as wrt log_dirs as described above.
>
> Meanwhile the controller is notified by ZK of the change in value of the
> znode and proceeds, as currently, by sending LeaderAndIsrRequest and
> StopReplicaRequest in order to complete the reassignments.
>
> The remaining problem is around how to measure progress of reassignment. As
> mentioned in the email I wrote this morning, I think we really need two
> different lag calculations if we're using the lag to measure progress and
> we want the property that lag=0 means reassignment has finished. The
> problem with that, I now realise, is the script might be called with
> reassignments which are a mix of:
>
> * inter-broker moves without a log dir (=> use HW-replicaLEO)
> * inter-broker moves with a log dir (=> use HW-replicaLEO)
> * intra-broker moves with a log dir (=> use .log_LEO - .move_LEO)
>
> And if there were two APIs we'd end up needing to make both kinds of query
> to each broker in the cluster. This morning I said:
>
> But AFAICS this observation doesn't really help much in terms of the APIs
> > concerned though. Since the requests would still need to go to different
> > brokers depending on which kind of movement is being performed.
> >
>
> But I wonder if that's *really* such a problem: In the case of an
> inter-broker move 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-10 Thread Tom Bentley
I've spent some time thinking about KIP-179 and KIP-113, the proposed
algorithms and APIs, and trying to weigh the pros and cons of various
alternative options.

I think Dong's reasons for the algorithm for inter-broker move in KIP-113
make a lot of sense. I don't think it would be at all simple to try to
change that algorithm to one where the whole thing can be triggered by a
single call to an AdminClient method. So I guess we should try to keep as
much of that algorithm as possible.

KIP-179 will need to change this step

 - The script creates reassignment znode in zookeeper.
>

with an AdminClient API call. This call can the same one as currently
specified in KIP-179 -- reassignPartitions() -- except the argument needs
to take into account the need to pass log dirs as well as broker ids. Thus
I would suggest

ReassignPartitionsResult reassignPartitions(Map> assignments)

where:

class ReplicAssignment {
int broker()
String logDirectory()// can be null
}

(This is just a Java representation of the reassignment json in KIP-113,
which itself is a superset of the reassignment json currently in use)

The corresponding protocol would look like this:

ReassignPartitionsRequest => timeout validate_only log_dirs
[topic_assignment]
  timout => int32
  validate_only => boolean
  log_dirs => [string]
  topic_assignment => topic [partition_assignment]
topic => string
partition_assignment => partition [replica]
  partition => int32
  replica => broker log_dir_index
broker => int32
log_dir_index => int16

The purpose of log_dirs is to serialize each log dir in the request only
once. These are then referred to by index in log_dir_index. The
log_dir_index can be -1, which means the caller doesn't care which log
directory should be used on the receiving broker.

This request can be sent to *any* broker. The broker which receives a
ReassignPartitionsRequest essentially converts it into reassignment JSON
and writes that JSON to the znode, then returns a
ReassignPartitionsResponse:

ReassignPartitionsResponse => throttle_time_ms [topic_assignment_result]
  throttle_time_ms => INT32
  log_dirs => [string]
  topic_assignment_result => topic partition_assignment_result
topic => STRING
partition_assignment_result => partition [replica_result]
  partition => int32
  replica_result => broker log_dir_index error_code
error_message
broker => int32
log_dir_index => int16
error_code => INT16
 error_message => NULLABLE_STRING

This is using the same encoding scheme as wrt log_dirs as described above.

Meanwhile the controller is notified by ZK of the change in value of the
znode and proceeds, as currently, by sending LeaderAndIsrRequest and
StopReplicaRequest in order to complete the reassignments.

The remaining problem is around how to measure progress of reassignment. As
mentioned in the email I wrote this morning, I think we really need two
different lag calculations if we're using the lag to measure progress and
we want the property that lag=0 means reassignment has finished. The
problem with that, I now realise, is the script might be called with
reassignments which are a mix of:

* inter-broker moves without a log dir (=> use HW-replicaLEO)
* inter-broker moves with a log dir (=> use HW-replicaLEO)
* intra-broker moves with a log dir (=> use .log_LEO - .move_LEO)

And if there were two APIs we'd end up needing to make both kinds of query
to each broker in the cluster. This morning I said:

But AFAICS this observation doesn't really help much in terms of the APIs
> concerned though. Since the requests would still need to go to different
> brokers depending on which kind of movement is being performed.
>

But I wonder if that's *really* such a problem: In the case of an
inter-broker move we just need to ask the leader, and in the case of an
intra-broker move we just have to ask that broker. In generally we'd need a
single request to each broker in the cluster. Then each broker would need
to service that request, but presumably it's just pulling a number out of a
ConcurrentHashMap, which is updated by the replica movement code in each of
the two cases (inter-broker and intra-broker). WDYT?

Assuming no one can see any glaring holes in what I'm proposing here, or
wants to suggest a workable alternative set of APIs and algorithms, then
I'll update KIP-179 to this effect.

Thanks for taking the time to read this far!

Tom

On 10 August 2017 at 11:56, Tom Bentley  wrote:

> Hi Dong and Jun,
>
> It seems that KIP-179 does not explicitly specify the definition of this
>> lag.
>
>
> Given that the definition of "caught up" is "is the replica in the ISR?",
> I found the code in Partition.maybeExpandIsr() which decides whether a
> replica should be added to the to the ISR and it uses replica.logEndOffset.
> offs

Re: [DISCUSS] KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines

2017-08-10 Thread Damian Guy
> Got it, thanks.
>
> Does it still make sense to have one static constructors for each spec,
> with one constructor having only one parameter to make it more usable, i.e.
> as a user I do not need to give all parameters if I only want to override
> one of them? Maybe we can just name the constructors as `with` but I'm not
> sure if Java distinguish:
>
> public static  Produced with(final Serde keySerde)
> public static  Produced with(final Serde valueSerde)
>
> as two function signatures.
>
>
No that won't work. That is why we have all options, i.e., on Produce
public static  Produced with(final Serde keySerde,
final Serde
valueSerde)
public static  Produced with(final StreamPartitioner
partitioner, final Serde keySerde, final Serde valueSerde)
public static  Produced keySerde(final Serde keySerde)
public static  Produced valueSerde(final Serde valueSerde)
public static  Produced streamPartitioner(final
StreamPartitioner partitioner)

So if you only want to use one you can just use the function that takes one
argument.

>
> Guozhang
>
>
> On Wed, Aug 9, 2017 at 6:20 AM, Damian Guy  wrote:
>
> > On Tue, 8 Aug 2017 at 20:11 Guozhang Wang  wrote:
> >
> > > Damian,
> > >
> > > Thanks for the proposal, I had a few comments on the APIs:
> > >
> > > 1. Printed#withFile seems not needed, as users should always spec if it
> > is
> > > to sysOut or to File at the beginning. In addition as a second
> thought, I
> > > think serdes are not useful for prints anyways since we assume
> `toString`
> > > is provided except for byte arrays, in which we will special handle it.
> > >
> > >
> > +1
> >
> >
> > > Another comment about Printed in general is it differs with other
> options
> > > that it is a required option than optional one, since it includes
> > toSysOut
> > > / toFile specs; what are the pros and cons for including these two in
> the
> > > option and hence make it a required option than leaving them at the API
> > > layer and make Printed as optional for mapper / label only?
> > >
> > >
> > It isn't required as we will still have the no-arg print() which will
> just
> > go to sysout as it does now.
> >
> >
> > >
> > > 2.1 KStream#through / to
> > >
> > > We should have an overloaded function without Produced?
> > >
> >
> > Yes - we already have those so they are not part of the KIP, i.e,
> > through(topic)
> >
> >
> > >
> > > 2.2 KStream#groupBy / groupByKey
> > >
> > > We should have an overloaded function without Serialized?
> > >
> >
> > Yes, as above
> >
> > >
> > > 2.3 KGroupedStream#count / reduce / aggregate
> > >
> > > We should have an overloaded function without Materialized?
> > >
> >
> > As above
> >
> > >
> > > 2.4 KStream#join
> > >
> > > We should have an overloaded function without Joined?
> > >
> >
> > as above
> >
> > >
> > >
> > > 2.5 Each of KTable's operators:
> > >
> > > We should have an overloaded function without Produced / Serialized /
> > > Materialized?
> > >
> > >
> > as above
> >
> >
> > >
> > >
> > > 3.1 Produced: the static functions have overlaps, which seems not
> > > necessary. I'd suggest jut having the following three static with
> another
> > > three similar member functions:
> > >
> > > public static  Produced withKeySerde(final Serde
> keySerde)
> > >
> > > public static  Produced withValueSerde(final Serde
> > > valueSerde)
> > >
> > > public static  Produced withStreamPartitioner(final
> > > StreamPartitioner partitioner)
> > >
> > > The key idea is that by using the same function name string for static
> > > constructor and member functions, users do not need to remember what
> are
> > > the differences but can call these functions with any ordering they
> want,
> > > and later calls on the same spec will win over early calls.
> > >
> > >
> > That would be great if java supported it, but it doesn't. You can't have
> > static an member functions with the same signature.
> >
> >
> > >
> > > 3.2 Serialized: similarly
> > >
> > > public static  Serialized withKeySerde(final Serde
> > keySerde)
> > >
> > > public static  Serialized withValueSerde(final Serde
> > > valueSerde)
> > >
> > > public Serialized withKeySerde(final Serde keySerde)
> > >
> > > public Serialized withValueSerde(final Serde valueSerde)
> > >
> >
> > as above
> >
> >
> > >
> > > Also it has a final Serde otherValueSerde in one of its static
> > > constructor, it that intentional?
> > >
> >
> > Nope: thanks.
> >
> > >
> > > 3.3. Joined: similarly, keep the static constructor signatures the same
> > as
> > > its corresponding member fields.
> > >
> > >
> > As above
> >
> >
> > > 3.4 Materialized: it is a bit special, and I think we can keep its
> static
> > > constructors with only two `as` as they are today.K
> > >
> > >
> > 4. Is there any modifications on StateStoreSupplier? Is it replaced by
> > > BytesStoreSupplier? Seems some more descriptions are lacking here. Also
> > in
> > >
> > >
> > No modifications to StateStoreSupplier. It is superseceded by
> > BytesStoreSupplier.
> >
> >
> >
> > > public s

[GitHub] kafka pull request #3654: KAFKA-5562: Do streams state directory cleanup on ...

2017-08-10 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/3654

KAFKA-5562: Do streams state directory cleanup on a single thread

Backported from trunk: https://github.com/apache/kafka/pull/3516

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka cherry-pick-stream-thread-cleanup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3654.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3654


commit 5df82dabac2ccc1cb035f89ee45cf3405e42ab06
Author: Damian Guy 
Date:   2017-07-26T09:10:50Z

Use a single `StateDirectory` per streams instance.
Use threadId to determine which thread owns the lock.
Only allow the owning thread to unlock.
Execute cleanup on a scheduled thread in `KafkaStreams`

commit ddb66fde7c0c4cfb10107ce89531356ab99b8a21
Author: Damian Guy 
Date:   2017-08-10T11:39:42Z

backport stream thread cleaning to 0.11




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #3653: KAFKA-5152: move state restoration out of rebalanc...

2017-08-10 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/3653

KAFKA-5152: move state restoration out of rebalance and into poll loop

In `onPartitionsAssigned`: 
1. release all locks for non-assigned suspended tasks.
2. resume any suspended tasks.
3. Create new tasks, but don't attempt to take the state lock.
4. Pause partitions for any new tasks.
5. set the state to `PARTITIONS_ASSIGNED`

In `StreamThread#runLoop`
1. poll
2. if state is `PARTITIONS_ASSIGNED`
 2.1  attempt to initialize any new tasks, i.e, take out the state locks 
and init state stores
 2.2 restore some data for changelogs, i.e., poll once on the restore 
consumer and return the partitions that have been fully restored
 2.3 update tasks with restored partitions and move any that have completed 
restoration to running
 2.4 resume consumption for any tasks where all partitions have been 
restored.
 2.5 if all active tasks are running, transition to `RUNNING` and assign 
standby partitions to the restoreConsumer.
 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka 0.11.0-restore-on-poll

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3653.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3653


commit 27016b9e9706ee95bcedd9a1408c71e62a0f178e
Author: Damian Guy 
Date:   2017-08-09T19:02:17Z

restore state on the poll loop




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-3389) ReplicaStateMachine areAllReplicasForTopicDeleted check not handling well case when there are no replicas for topic

2017-08-10 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3389.
--
Resolution: Won't Fix

As mentioned in the previous comment, this may not be an issue.  Pl reopen if 
still exists

> ReplicaStateMachine areAllReplicasForTopicDeleted check not handling well 
> case when there are no replicas for topic
> ---
>
> Key: KAFKA-3389
> URL: https://issues.apache.org/jira/browse/KAFKA-3389
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.1
>Reporter: Stevo Slavic
>Assignee: Manikumar
>Priority: Minor
>
> Line ReplicaStateMachine.scala#L285
> {noformat}
> replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful)
> {noformat}
> which is return value of {{areAllReplicasForTopicDeleted}} function/check, 
> probably should better be checking for
> {noformat}
> replicaStatesForTopic.isEmpty || replicaStatesForTopic.forall(_._2 == 
> ReplicaDeletionSuccessful)
> {noformat}
> I noticed it because in controller logs I found entries like:
> {noformat}
> [2016-03-04 13:27:29,115] DEBUG [Replica state machine on controller 1]: Are 
> all replicas for topic foo deleted Map() 
> (kafka.controller.ReplicaStateMachine)
> {noformat}
> even though normally they look like:
> {noformat}
> [2016-03-04 09:33:41,036] DEBUG [Replica state machine on controller 1]: Are 
> all replicas for topic foo deleted Map([Topic=foo,Partition=0,Replica=0] -> 
> ReplicaDeletionStarted, [Topic=foo,Partition=0,Replica=3] -> 
> ReplicaDeletionStarted, [Topic=foo,Partition=0,Replica=1] -> 
> ReplicaDeletionSuccessful) (kafka.controller.ReplicaStateMachine)
> {noformat}
> This may cause topic deletion request never to be cleared from ZK even when 
> topic has been deleted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-10 Thread Tom Bentley
Hi Dong and Jun,

It seems that KIP-179 does not explicitly specify the definition of this
> lag.


Given that the definition of "caught up" is "is the replica in the ISR?", I
found the code in Partition.maybeExpandIsr() which decides whether a
replica should be added to the to the ISR and it uses
replica.logEndOffset.offsetDiff(leaderHW)
>= 0, so for this purpose I would define the lag as max(leaderHW -
replicaLEO, 0). I think we agree this should work for inter-broker
movement, where the leader knows these quantities.

As Dong says, this doesn't work for the intra-broker case:

Note that we can not calculate lag as max(0, HW - LEO)
> because we still need the difference between two lags to measure the
> progress of intra-broker replica movement.
>

It seems to me that the intra-broker case is actually a special case of the
inter-broker case. Conceptually with an intra-broker move the ".log"
replica is the leader, the ".move" directory is the follower, the ISR is
the singleton containing the leader, thus the HW if the LEO of the ".log".
Viewed in this way, Dong's method of leaderLEO - followerLEO is the same
thing for the intra-broker case as HW-LEO is for the inter-broker case.

But AFAICS this observation doesn't really help much in terms of the APIs
concerned though. Since the requests would still need to go to different
brokers depending on which kind of movement is being performed.

So perhaps this is another case where maybe it makes sense to keep the two
APIs separate, one API for measuring inter-broker movement progress an
another for the intra-broker case. WDYT?

Thanks for the continuing discussion on this!

Tom


On 10 August 2017 at 05:28, Dong Lin  wrote:

> Hey Jun,
>
> I have been thinking about whether it is better to return lag (i.e. HW -
> LEO) instead of LEO. Note that the lag in the DescribeDirsResponse may be
> negative if LEO > HW. It will almost always be negative for leader and
> in-sync replicas. Note that we can not calculate lag as max(0, HW - LEO)
> because we still need the difference between two lags to measure the
> progress of intra-broker replica movement. The AdminClient API can choose
> to return max(0, HW - LEO) depending on whether it is used for tracking
> progress of inter-broker reassignment or intra-broker movement. Is it OK?
> If so, I will update the KIP-113 accordingly to return lag in the
> DescribeDirsResponse .
>
> Thanks,
> Dong
>
>
>
>  source=link&utm_campaign=sig-email&utm_content=webmail&utm_term=icon>
> Virus-free.
> www.avast.com
>  source=link&utm_campaign=sig-email&utm_content=webmail&utm_term=link>
> <#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>
> On Wed, Aug 9, 2017 at 5:06 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > Yes, the lag in a replica is calculated as the difference of LEO of the
> > replica and the HW. So, as long as a replica is in sync, the lag is
> always
> > 0.
> >
> > So, I was suggesting to return lag instead of LEO in DescribeDirsResponse
> > for each replica. I am not sure if we need to return HW though.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Aug 9, 2017 at 5:01 PM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > It just came to me that you may be assuming that folower_lag = HW -
> > > follower_LEO. If that is the case, then we need to have new
> > > request/response to retrieve this lag since the DescribeDirsResponse
> > > doesn't even include HW. It seems that KIP-179 does not explicitly
> > specify
> > > the definition of this lag.
> > >
> > > I have been assuming that follow_lag = leader_LEO - follower_LEO given
> > that
> > > the request is used to query the reassignment status. Strictly speaking
> > the
> > > difference between leader_LEO and the HW is limited by the amount of
> data
> > > produced in KafkaConfig.replicaLagTimeMaxMs, which is 10 seconds. I
> also
> > > assumed that 10 seconds is probably not a big deal given the typical
> time
> > > length of the reassignment.
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Wed, Aug 9, 2017 at 4:40 PM, Dong Lin  wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > If I understand you right, you are suggesting that, in the case when
> > > there
> > > > is continuous incoming traffic, the approach in the KIP-179 will
> report
> > > lag
> > > > as 0 whereas the approach using DescribeDirsRequest will report lag
> as
> > > > non-zero. But I think the approach in KIP-179 will also report
> non-zero
> > > lag
> > > > when there is continuous traffic. This is because at the time the
> > leader
> > > > receives ReplicaStatusRequest, it is likely that some data has been
> > > > appended to the partition after the last FetchRequest from the
> > follower.
> > > > Does this make sense?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > >
> > > > On Wed, Aug 9, 2017 at 4:24 PM, Jun Rao  wrote:
> > > >
> > > >> Hi, Dong,
> > > >>
> > > >> As for whether to return LEO or lag, my p

[GitHub] kafka pull request #3652: MINOR: change log level in ThreadCache to trace

2017-08-10 Thread dguy
Github user dguy closed the pull request at:

https://github.com/apache/kafka/pull/3652


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-5723) Refactor BrokerApiVersionsCommand to use the KafkaAdminClient

2017-08-10 Thread Viktor Somogyi (JIRA)
Viktor Somogyi created KAFKA-5723:
-

 Summary: Refactor BrokerApiVersionsCommand to use the 
KafkaAdminClient
 Key: KAFKA-5723
 URL: https://issues.apache.org/jira/browse/KAFKA-5723
 Project: Kafka
  Issue Type: Improvement
Reporter: Viktor Somogyi
Assignee: Viktor Somogyi


Currently it uses the deprecated AdminClient and in order to remove usages of 
that client, this class needs to be refactored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5722) Refactor ConfigCommand to use the new AdminClient

2017-08-10 Thread Viktor Somogyi (JIRA)
Viktor Somogyi created KAFKA-5722:
-

 Summary: Refactor ConfigCommand to use the new AdminClient
 Key: KAFKA-5722
 URL: https://issues.apache.org/jira/browse/KAFKA-5722
 Project: Kafka
  Issue Type: Improvement
Reporter: Viktor Somogyi
Assignee: Viktor Somogyi


The ConfigCommand currently uses a direct connection to zookeeper. The 
zookeeper dependency should be deprecated and an AdminClient API created to be 
used instead.
This change requires a KIP.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-185: Make exactly once in order delivery per partition the default producer setting

2017-08-10 Thread Dong Lin
Hey Apurva,

Thanks for the KIP. I have read through the KIP and the prior discussion in
this thread. I have three concerns that are related to Becket's comments:

- Is it true that, as Becket has mentioned, producer.flush() may block
infinitely if retries=MAX_INT? This seems like a possible reason to break
user's application. I think we probably should avoid causing correctness
penalty for application.

- It seems that OutOfOrderSequenceException will be a new exception thrown
to user after this config change. Can you clarify whether this will cause
correctness penalty for application?

- It is not very clear to me whether the benefit of increasing acks from 1
to all is worth the performance hit. For users who have not already
overridden acks to all, it is very likely that they are not already doing
other complicated work (e.g. close producer in callback) that are necessary
for exactly-once delivery. Thus those users won't have exactly-once
semantics by simply picking up the change in the default acks
configuration. It seems that the only benefit of this config change is the
well-known tradeoff between performance and message loss rate. I am not
sure this is a strong reason to risk reducing existing user's performance.

I think my point is that we should not to make change that will break
user's existing application. And we should try to avoid reducing user's
performance unless there is strong benefit of doing so (e.g. exactly-once).

Thanks,
Dong




On Wed, Aug 9, 2017 at 10:43 PM, Apurva Mehta  wrote:

> Thanks for your email Becket.
>
> Your observations around using acks=1 and acks=-1 are correct. Do note that
> getting an OutOfOrderSequence means that acknowledged data has been lost.
> This could be due to a weaker acks setting like acks=1 or due to a topic
> which is not configured to handle broker failures cleanly (unclean leader
> election is enabled, etc.). Either way, you are right in observing that if
> an app is very serious about having exactly one copy of each ack'd message
> in the log, it is a significant effort to recover from this error.
>
> However, I propose an alternate way of thinking about this: is it
> worthwhile shipping Kafka with the defaults tuned for strong semantics?
> That is essentially what is being proposed here, and of course there will
> be tradeoffs with performance and deployment costs-- you can't have your
> cake and eat it too.
>
> And if we want to ship Kafka with strong semantics by default, we might
> want to make the default topic level settings as well as the client
> settings more robust. This means, for instance, disabling unclean leader
> election by default. If there are other configs we need to change on the
> broker side to ensure that ack'd messages are not lost due to transient
> failures, we should change those as well as part of a future KIP.
>
> Personally, I think that the defaults should provide robust guarantees.
>
> And this brings me to another point: these are just proposed defaults.
> Nothing is being taken away in terms of flexibility to tune for different
> behavior.
>
> Finally, the way idempotence is implemented means that there needs to be
> some cap on max.in.flight when idempotence is enabled -- that is just a
> tradeoff of the feature. Do we have any data that there are installations
> which benefit greatly for a value of max.in.flight > 5? For instance,
> LinkedIn probably has the largest and most demanding deployment of Kafka.
> Are there any applications which use max.inflight > 5? That would be good
> data to have.
>
> Thanks,
> Apurva
>
>
>
>
>
> On Wed, Aug 9, 2017 at 2:59 PM, Becket Qin  wrote:
>
> > Thanks for the KIP, Apurva. It is a good time to review the
> configurations
> > to see if we can improve the user experience. We also might need to think
> > from users standpoint about the out of the box experience.
> >
> > 01. Generally speaking, I think it makes sense to make idempotence=true
> so
> > we can enable producer side pipeline without ordering issue. However, the
> > impact is that users may occasionally receive OutOfOrderSequencException.
> > In this case, there is not much user can do if they want to ensure
> > ordering. They basically have to close the producer in the call back and
> > resend all the records that are in the RecordAccumulator. This is very
> > involved. And the users may not have a way to retrieve the Records in the
> > accumulator anymore. So for the users who really want to achieve the
> > exactly once semantic, there are actually still a lot of work to do even
> > with those default. For the rest of the users, they need to handle one
> more
> > exception, which might not be a big deal.
> >
> > 02. Setting acks=-1 would significantly reduce the likelihood of
> > OutOfOrderSequenceException from happening. However, the
> latency/throughput
> > impact and additional purgatory burden on the broker are big concerns.
> And
> > it does not really guarantee exactly once without broker side
> > configura

[GitHub] kafka pull request #3652: MINOR: change log level in ThreadCache to trace

2017-08-10 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/3652

MINOR: change log level in ThreadCache to trace

cache eviction logging at debug level is too high volume

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka minor-cache-log-level

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3652.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3652


commit fd02e947264be60a92f358be4b66f57c0722
Author: Damian Guy 
Date:   2017-08-10T07:42:28Z

change log level in ThreadCache to trace




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---