Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-04 Thread Anna Povzner
Hi David and Jun,

I dug a bit deeper into the Rate implementation, and wanted to confirm that
I do believe that the token bucket behavior is better for the reasons we
already discussed but wanted to summarize. The main difference between Rate
and token bucket is that the Rate implementation allows a burst by
borrowing from the future, whereas a token bucket allows a burst by using
accumulated tokens from the previous idle period. Using accumulated tokens
smoothes out the rate measurement in general. Configuring a large burst
requires configuring a large quota window, which causes long delays for
bursty workload, due to borrowing credits from the future. Perhaps it is
useful to add a summary in the beginning of the Throttling Algorithm
section?

In my previous email, I mentioned the issue we observed with the bandwidth
quota, where a low quota (1MB/s per broker) was limiting bandwidth visibly
below the quota. I thought it was strictly the issue with the Rate
implementation as well, but I found a root cause to be different but
amplified by the Rate implementation (long throttle delays of requests in a
burst). I will describe it here for completeness using the following
example:

   -

   Quota = 1MB/s, default window size and number of samples
   -

   Suppose there are 6 connections (maximum 6 outstanding requests), and
   each produce request is 5MB. If all requests arrive in a burst, the last 4
   requests (20MB over 10MB allowed in a window) may get the same throttle
   time if they are processed concurrently. We record the rate under the lock,
   but then calculate throttle time separately after that. So, for each
   request, the observed rate could be 3MB/s, and each request gets throttle
   delay = 20 seconds (instead of 5, 10, 15, 20 respectively). The delay is
   longer than the total rate window, which results in lower bandwidth than
   the quota. Since all requests got the same delay, they will also arrive in
   a burst, which may also result in longer delay than necessary. It looks
   pretty easy to fix, so I will open a separate JIRA for it. This can be
   additionally mitigated by token bucket behavior.


For the algorithm "So instead of having one sample equal to 560 in the last
window, we will have 100 samples equal to 5.6.", I agree with Jun. I would
allocate 5 per each old sample that is still in the overall window. It
would be a bit larger granularity than the pure token bucket (we lose 5
units / mutation once we move past the sample window), but it is better
than the long delay.

Thanks,

Anna


On Thu, Jun 4, 2020 at 6:33 PM Jun Rao  wrote:

> Hi, David, Anna,
>
> Thanks for the discussion and the updated wiki.
>
> 11. If we believe the token bucket behavior is better in terms of handling
> the burst behavior, we probably don't need a separate KIP since it's just
> an implementation detail.
>
> Regarding "So instead of having one sample equal to 560 in the last window,
> we will have 100 samples equal to 5.6.", I was thinking that we will
> allocate 5 to each of the first 99 samples and 65 to the last sample. Then,
> 6 new samples have to come before the balance becomes 0 again. Intuitively,
> we are accumulating credits in each sample. If a usage comes in, we first
> use all existing credits to offset that. If we can't, the remaining usage
> will be recorded in the last sample, which will be offset by future
> credits. That seems to match the token bucket behavior the closest.
>
> 20. Could you provide some guidelines on the typical rate that an admin
> should set?
>
> Jun
>
> On Thu, Jun 4, 2020 at 8:22 AM David Jacot  wrote:
>
> > Hi all,
> >
> > I just published an updated version of the KIP which includes:
> > * Using a slightly modified version of our Rate. I have tried to
> formalize
> > it based on our discussion. As Anna suggested, we may find a better way
> to
> > implement it.
> > * Handling of ValidateOnly as pointed out by Tom.
> >
> > Please, check it out and let me know what you think.
> >
> > Best,
> > David
> >
> > On Thu, Jun 4, 2020 at 4:57 PM Tom Bentley  wrote:
> >
> > > Hi David,
> > >
> > > As a user I might expect the validateOnly option to do everything
> except
> > > actually make the changes. That interpretation would imply the quota
> > should
> > > be checked, but the check should obviously be side-effect free. I think
> > > this interpretation could be useful because it gives the caller either
> > some
> > > confidence that they're not going to hit the quota, or tell them, via
> the
> > > exception, when they can expect the call to work. But for this to be
> > useful
> > > it would require the retry logic to not retry the request when
> > validateOnly
> > > was set.
> > >
> > > On the other hand, if validateOnly is really about validating only some
> > > aspects of the request (which maybe is what the name implies), then we
> > > should clarify in the Javadoc that the quota is not included in the
> > > validation.
> > >
> > > On balance, I agree with what y

[VOTE] KIP-620 Deprecate ConsumerConfig#addDeserializerToConfig(Properties, Deserializer, Deserializer) and ProducerConfig#addSerializerToConfig(Properties, Serializer, Serializer)

2020-06-04 Thread Chia-Ping Tsai
hi All,

I would like to start the vote on KIP-620:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=155749118

--
Chia-Ping


Re: [DISCUSS] KIP-619 Deprecate ConsumerConfig#addDeserializerToConfig(Properties, Deserializer, Deserializer) and ProducerConfig#addSerializerToConfig(Properties, Serializer, Serializer)

2020-06-04 Thread Chia-Ping Tsai
> I think the KIP is quite straightforward and you could even skip the
> DISCUSS and call for a VOTE directly.

Copy that

On 2020/06/04 23:43:12, "Matthias J. Sax"  wrote: 
> Btw:
> 
> I think the KIP is quite straightforward and you could even skip the
> DISCUSS and call for a VOTE directly.
> 
> 
> -Matthias
> 
> On 6/4/20 4:40 PM, Matthias J. Sax wrote:
> > @Chia-Ping
> > 
> > Can you maybe start a new DISCUSS thread using the new KIP number? This
> > would help to keep the threads separated.
> > 
> > Thanks!
> > 
> > 
> > -Matthias
> > 
> > On 6/3/20 6:56 AM, Chia-Ping Tsai wrote:
> >> When I created the KIP, the next number was 619 and not sure why the 
> >> number is out of sync.
> >>
> >> At any rate, I will update the KIP number :_
> >>
> >> On 2020/06/03 05:06:39, Cheng Tan  wrote: 
> >>> Hi Chia, 
> >>>
> >>> Hope you are doing well. I already took KIP-619 as my KIP identification 
> >>> number. Could you change your KIP id? Thank you.
> >>>
> >>> Best, - Cheng
> >>>
>  On May 31, 2020, at 8:08 PM, Chia-Ping Tsai  wrote:
> 
>  hi All,
> 
>  This KIP plans to deprecate two unused methods without replacement.
> 
>  All suggestions are welcome!
> 
>  KIP: 
>  https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=155749118
>  ISSUE: https://issues.apache.org/jira/browse/KAFKA-10044
> 
>  ---
>  Chia-Ping
> >>>
> >>>
> > 
> 
> 


[jira] [Created] (KAFKA-10106) measure and log time taken to handle LeaderAndIsr request

2020-06-04 Thread NIKHIL (Jira)
NIKHIL created KAFKA-10106:
--

 Summary: measure and log time taken to handle LeaderAndIsr request 
 Key: KAFKA-10106
 URL: https://issues.apache.org/jira/browse/KAFKA-10106
 Project: Kafka
  Issue Type: Improvement
Reporter: NIKHIL


ReplicaManager!becomeLeaderOrFollower handles the LeaderAndIsr request, 
StateChangeLogger logs when this request is handled, however it can be useful 
to log when this calls ends and record the time taken, can help operationally. 

Proposal is to ReplicaManager!becomeLeaderOrFollower start measuring the time 
before the `replicaStateChangeLock` is acquired and log before the response is 
returned. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10105) Regression in group coordinator dealing with flaky clients joining while leaving

2020-06-04 Thread William Reynolds (Jira)
William Reynolds created KAFKA-10105:


 Summary: Regression in group coordinator dealing with flaky 
clients joining while leaving
 Key: KAFKA-10105
 URL: https://issues.apache.org/jira/browse/KAFKA-10105
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.4.1
 Environment: Kafka 1.1.0 on jre 8 on debian 9 in docker
Kafka 2.4.1 on jre 11 on debian 9 in docker
Reporter: William Reynolds


Since upgrade of a cluster from 1.1.0 to 2.4.1 the broker no longer deals 
correctly with a consumer sending a join after a leave correctly.

What happens no is that if a consumer sends a leaving then follows up by trying 
to send a join again as it is shutting down the group coordinator adds the 
leaving member to the group but never seems to heartbeat that member.

Since the consumer is then gone when it joins again after starting it is added 
as a new member but the zombie member is there and is included in the partition 
assignment which means that those partitions never get consumed from. What can 
also happen is that one of the zombies gets group leader so rebalance gets 
stuck forever and the group is entirely blocked.

I have not been able to track down where this got introduced between 1.1.0 and 
2.4.1 but I will look further into this. Unfortunately the logs are essentially 
silent about the zombie mebers and I only had INFO level logging on during the 
issue and by stopping all the consumers in the group and restarting the broker 
coordinating that group we could get back to a working state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk14 #178

2020-06-04 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Add explanation for disabling forwarding from value transformers

[github] KAFKA-10066: TestOutputTopic should pass record headers into

[github] KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric


--
[...truncated 6.31 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyT

Re: [DISCUSS] KIP-513: Distinguish between Key and Value serdes in scala wrapper library for kafka streams

2020-06-04 Thread Yuriy Badalyantc
Hi Matthias,

I think you misunderstood my diagram. I wanted to show the next
relationships:
- Serde is KeySerde and ValueSerde
- KeySerde is KeySerializer and KeyDeserializer
- ValueSerde is ValueSerializer and ValueDeserializer

So, `Serde` is is most "powerfull" class. It could do anything. `KeySerde`
and `ValueSerde` are not specialized `Serde`, but a component of a `Serde`.

Implementation of these relationships through inheritance will look like
this (pseudocode):
class KeySerializer
class KeyDeserializer
class ValueSerializer
class ValueDeserializer
class KeySerde extends KeySerializer with KeyDeserializer
class ValueSerde extends ValueSerializer with ValueDeserializer
class Serde extends KeySerde with ValueSerde

So, everywhere where you need, for example, `KeyDeserializer`, you could
use `KeyDeserializer`, `KeySerde` or `Serde`.

-Yuriy

On Fri, Jun 5, 2020 at 7:10 AM Matthias J. Sax  wrote:

> As you say "hierarchy" I read this really as "class hierarchy". For this
> case, I think that we need to do it differently.
>
> I agree to this part:
>
>   KeySerde extends Serde
>   ValueSerde extends Serde
>
> However,
>
>   KeySerializer extends KeySerde (etc)
>
> does not make sense IMHO, because a `KeySerializer` is no a special
> KeySerde; it's only a serializer but it's not a deserializer.
>
> In fact the hierarchy goes the other direction:
>
>   Serde extends Serializerd, Deserializer
>
>
> Atm, a Serde is just a "container" for serializers and desrialzier though.
>
>
> -Matthias
>
>
>
> On 6/2/20 10:40 PM, Yuriy Badalyantc wrote:
> > Hi.
> >
> > I'm the author of the KIP-616. I got acquainted with the KIP-513 and I
> > think overall it's a good idea. But I think workaround only on the scala
> > side is not enough. We could consider moving a bit further and change
> serde
> > hierarchy on the java side to something like this:
> >
> > KeySerializer-↘
> >KeySerde---↘
> > KeyDeserializer---↗↓
> >Serde
> > ValueSerializer---↘↑
> >ValueSerde-↗
> > ValueDeserializer-↗
> >
> > I understand that this is a bit too revolutionary, but I think this is
> the
> > right direction.
> >
> > Regarding KIP-616 and KIP-513 connection. It's hard to say at the current
> > moment how should we implement these kips: together or separately. It
> looks
> > like there is no consensus on the implementation details for these kips.
> >
> > If we decided to create a new `KeySerde` and `ValueSerde` on the scala
> side
> > (current plan without my proposal above), we could use their companions
> for
> > default instances. In this case, it looks like we don't need to do
> KIP-616.
> > But what about backward compatibility? What should we do with
> > `org.apache.kafka.streams.scala.Serdes`? Deprecate it?
> >
> > - Yuriy
> >
> > On Sun, May 31, 2020 at 1:24 AM John Roesler 
> wrote:
> >
> >> Hi Mykhailo,
> >>
> >> Wow, I really dropped the ball here. I have just looked over your KIP
> >> again, and now I see how you don’t need to change every dsl method, only
> >> Consumed, Materialized, etc.
> >>
> >> I think this would be a good addition. Yuriy has just proposed KIP-616
> to
> >> fix some other problems with the implicit serdes. I’m wondering if these
> >> two kips have any joint opportunities we should consider, or if it’s
> better
> >> to continue to consider them separately.
> >>
> >> Thanks,
> >> John
> >>
> >> On Wed, Jan 22, 2020, at 16:18, Михаил Ерёменко wrote:
> >>> Hi, John!
> >>>
> >>> Sorry for the late reply. I am not really familiar with this mail list
> >>> discussions, so I have not seen your mails.
> >>>
> >>> Regarding your question:
>    I guess what
> >>>   I'm struggling with is why you actually want to have different key
> and
> >>>   serdes for the same type
> >>>
> >>> I think good example will be (and it is actually what we do in ours
> >>> project) using confluent schema registry in conjunction with kafka
> >>> streams. Some models can be used as keys as well as values. When we
> >>> define schema registry compatible serde, we have to specify is it for
> >>> key or not. We can of course create two serdes for the same model, but
> >>> in this case implicit semantic will not work because scala doesn’t know
> >>> which implicit to pick. And things become even more complicated in case
> >>> if you will try to derive your serdes (we derive serdes in ours
> >>> project).
> >>>
> >>> One more thing:
>  every method in the streams-scala DSL.
> >>>
> >>> So far we've just changed
> >>> org.apache.kafka.streams.scala.ImplicitConversions and
> >>> org.apache.kafka.streams.scala.kstream.Materialized and it works for
> >>> us. Also we did introduce default serdes for primitive types.
> >>>
> >>> Regards,
> >>> Mykhailo
> >>
> >
>
>


Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-04 Thread Jun Rao
Hi, David, Anna,

Thanks for the discussion and the updated wiki.

11. If we believe the token bucket behavior is better in terms of handling
the burst behavior, we probably don't need a separate KIP since it's just
an implementation detail.

Regarding "So instead of having one sample equal to 560 in the last window,
we will have 100 samples equal to 5.6.", I was thinking that we will
allocate 5 to each of the first 99 samples and 65 to the last sample. Then,
6 new samples have to come before the balance becomes 0 again. Intuitively,
we are accumulating credits in each sample. If a usage comes in, we first
use all existing credits to offset that. If we can't, the remaining usage
will be recorded in the last sample, which will be offset by future
credits. That seems to match the token bucket behavior the closest.

20. Could you provide some guidelines on the typical rate that an admin
should set?

Jun

On Thu, Jun 4, 2020 at 8:22 AM David Jacot  wrote:

> Hi all,
>
> I just published an updated version of the KIP which includes:
> * Using a slightly modified version of our Rate. I have tried to formalize
> it based on our discussion. As Anna suggested, we may find a better way to
> implement it.
> * Handling of ValidateOnly as pointed out by Tom.
>
> Please, check it out and let me know what you think.
>
> Best,
> David
>
> On Thu, Jun 4, 2020 at 4:57 PM Tom Bentley  wrote:
>
> > Hi David,
> >
> > As a user I might expect the validateOnly option to do everything except
> > actually make the changes. That interpretation would imply the quota
> should
> > be checked, but the check should obviously be side-effect free. I think
> > this interpretation could be useful because it gives the caller either
> some
> > confidence that they're not going to hit the quota, or tell them, via the
> > exception, when they can expect the call to work. But for this to be
> useful
> > it would require the retry logic to not retry the request when
> validateOnly
> > was set.
> >
> > On the other hand, if validateOnly is really about validating only some
> > aspects of the request (which maybe is what the name implies), then we
> > should clarify in the Javadoc that the quota is not included in the
> > validation.
> >
> > On balance, I agree with what you're proposing, since the extra utility
> of
> > including the quota in the validation seems to be not worth the extra
> > complication for the retry.
> >
> > Thanks,
> >
> > Tom
> >
> >
> >
> > On Thu, Jun 4, 2020 at 3:32 PM David Jacot  wrote:
> >
> > > Hi Tom,
> > >
> > > That's a good question. As the validation does not create any load on
> the
> > > controller, I was planning to do it without checking the quota at all.
> > Does
> > > that
> > > sound reasonable?
> > >
> > > Best,
> > > David
> > >
> > > On Thu, Jun 4, 2020 at 4:23 PM David Jacot 
> wrote:
> > >
> > > > Hi Jun and Anna,
> > > >
> > > > Thank you both for your replies.
> > > >
> > > > Based on our recent discussion, I agree that using a rate is better
> to
> > > > remain
> > > > consistent with other quotas. As you both suggested, it seems that
> > > changing
> > > > the way we compute the rate to better handle spiky workloads and
> > behave a
> > > > bit more similarly to the token bucket algorithm makes sense for all
> > > > quotas as
> > > > well.
> > > >
> > > > I will update the KIP to reflect this.
> > > >
> > > > Anna, I think that we can explain this in this KIP. We can't just say
> > > that
> > > > the Rate
> > > > will be updated in this KIP. I think that we need to give a bit more
> > > info.
> > > >
> > > > Best,
> > > > David
> > > >
> > > > On Thu, Jun 4, 2020 at 6:31 AM Anna Povzner 
> wrote:
> > > >
> > > >> Hi Jun and David,
> > > >>
> > > >> Regarding token bucket vs, Rate behavior. We recently observed a
> > couple
> > > of
> > > >> cases where a bursty workload behavior would result in long-ish
> pauses
> > > in
> > > >> between, resulting in lower overall bandwidth than the quota. I will
> > > need
> > > >> to debug this a bit more to be 100% sure, but it does look like the
> > case
> > > >> described by David earlier in this thread. So, I agree with Jun -- I
> > > think
> > > >> we should make all quota rate behavior consistent, and probably
> > similar
> > > to
> > > >> the token bucket one.
> > > >>
> > > >> Looking at KIP-13, it doesn't describe rate calculation in enough
> > > detail,
> > > >> but does mention window size. So, we could keep "window size" and
> > > "number
> > > >> of samples" configs and change Rate implementation to be more
> similar
> > to
> > > >> token bucket:
> > > >> * number of samples define our burst size
> > > >> * Change the behavior, which could be described as: If a burst
> happens
> > > >> after an idle period, the burst would effectively spread evenly over
> > the
> > > >> (now - window) time period, where window is ( -
> 1)*
> > > >> . Which basically describes a token bucket, while
> keeping
> > > the
> > > >> current quota configs. I think we can ev

[jira] [Resolved] (KAFKA-10040) Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-04 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10040.
-
Fix Version/s: 2.7.0
   Resolution: Fixed

> Make computing the PreferredReplicaImbalanceCount metric more efficient
> ---
>
> Key: KAFKA-10040
> URL: https://issues.apache.org/jira/browse/KAFKA-10040
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7.0
>
>
> At the moment, computing the PreferredReplicaImbalanceCount metric traverses 
> all the partitions in the cluster to find out the imbalance ones. This is 
> extremely costly in cluster with large number of partitions and this is done 
> after the processing of each event in the controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-572: Improve timeouts and retires in Kafka Streams

2020-06-04 Thread Matthias J. Sax
Guozhang,

what you propose makes sense, but this seems to get into implementation
detail territory already?

Thus, if there are nor further change requests to the KIP wiki page
itself, I would like to proceed with the VOTE.


-Matthias

On 5/20/20 12:30 PM, Guozhang Wang wrote:
> Thanks Matthias,
> 
> I agree with you on all the bullet points above. Regarding the admin-client
> outer-loop retries inside partition assignor, I think we should treat error
> codes differently from those two blocking calls:
> 
> Describe-topic:
> * unknown-topic (3): add this topic to the to-be-created topic list.
> * leader-not-available (5): do not try to create, retry in the outer loop.
> * request-timeout: break the current loop and retry in the outer loop.
> * others: fatal error.
> 
> Create-topic:
> * topic-already-exists: retry in the outer loop to validate the
> num.partitions match expectation.
> * request-timeout: break the current loop and retry in the outer loop.
> * others: fatal error.
> 
> And in the outer-loop, I think we can have a global timer for the whole
> "assign()" function, not only for the internal-topic-manager, and the timer
> can be hard-coded with, e.g. half of the rebalance.timeout to get rid of
> the `retries`; if we cannot complete the assignment before the timeout runs
> out, we can return just the partial assignment (e.g. if there are two
> tasks, but we can only get the topic metadata for one of them, then just do
> the assignment for that one only) while encoding in the error-code field to
> request for another rebalance.
> 
> Guozhang
> 
> 
> 
> On Mon, May 18, 2020 at 7:26 PM Matthias J. Sax  wrote:
> 
>> No worries Guozhang, any feedback is always very welcome! My reply is
>> going to be a little longer... Sorry.
>>
>>
>>
>>> 1) There are some inconsistent statements in the proposal regarding what
>> to
>>> deprecated:
>>
>> The proposal of the KIP is to deprecate `retries` for producer, admin,
>> and Streams. Maybe the confusion is about the dependency of those
>> settings within Streams and that we handle the deprecation somewhat
>> different for plain clients vs Streams:
>>
>> For plain producer/admin the default `retries` is set to MAX_VALUE. The
>> config will be deprecated but still be respected.
>>
>> For Streams, the default `retries` is set to zero, however, this default
>> retry does _not_ affect the embedded producer/admin clients -- both
>> clients stay on their own default of MAX_VALUES.
>>
>> Currently, this introduces the issue, that if a user wants to increase
>> Streams retries, she might by accident reduce the embedded client
>> retries, too. To avoid this issue, she would need to set
>>
>> retries=100
>> producer.retires=MAX_VALUE
>> admin.retries=MAX_VALUE
>>
>> This KIP will fix this issue only in the long term though, ie, when
>> `retries` is finally removed. Short term, using `retries` in
>> StreamsConfig would still affect the embedded clients, but Streams, as
>> well as both client would log a WARN message. This preserves backward
>> compatibility.
>>
>> Withing Streams `retries` is ignored and the new `task.timeout.ms` is
>> used instead. This increase the default resilience of Kafka Streams
>> itself. We could also achieve this by still respecting `retries` and to
>> change it's default value. However, because we deprecate `retries` it
>> seems better to just ignore it and switch to the new config directly.
>>
>> I updated the KIPs with some more details.
>>
>>
>>
>>> 2) We should also document the related behavior change in
>> PartitionAssignor
>>> that uses AdminClient.
>>
>> This is actually a good point. Originally, I looked into this only
>> briefly, but it raised an interesting question on how to handle it.
>>
>> Note that `TimeoutExceptions` are currently not handled in this retry
>> loop. Also note that the default retries value for other errors would be
>> MAX_VALUE be default (inherited from `AdminClient#retries` as mentioned
>> already by Guozhang).
>>
>> Applying the new `task.timeout.ms` config does not seem to be
>> appropriate because the AdminClient is used during a rebalance in the
>> leader. We could introduce a new config just for this case, but it seems
>> to be a little bit too much. Furthermore, the group-coordinator applies
>> a timeout on the leader anyway: if the assignment is not computed within
>> the timeout, the leader is removed from the group and another rebalance
>> is triggered.
>>
>> Overall, we make multiple admin client calls and thus we should keep
>> some retry logic and not just rely on the admin client internal retries,
>> as those would fall short to retry different calls interleaved. We could
>> just retry infinitely and rely on the group coordinator to remove the
>> leader eventually. However, this does not seem to be ideal because the
>> removed leader might be stuck forever.
>>
>> The question though is: if topic metadata cannot be obtained or internal
>> topics cannot be created, what should we do? We can't compute 

Re: [DISCUSS] KIP-513: Distinguish between Key and Value serdes in scala wrapper library for kafka streams

2020-06-04 Thread Matthias J. Sax
As you say "hierarchy" I read this really as "class hierarchy". For this
case, I think that we need to do it differently.

I agree to this part:

  KeySerde extends Serde
  ValueSerde extends Serde

However,

  KeySerializer extends KeySerde (etc)

does not make sense IMHO, because a `KeySerializer` is no a special
KeySerde; it's only a serializer but it's not a deserializer.

In fact the hierarchy goes the other direction:

  Serde extends Serializerd, Deserializer


Atm, a Serde is just a "container" for serializers and desrialzier though.


-Matthias



On 6/2/20 10:40 PM, Yuriy Badalyantc wrote:
> Hi.
> 
> I'm the author of the KIP-616. I got acquainted with the KIP-513 and I
> think overall it's a good idea. But I think workaround only on the scala
> side is not enough. We could consider moving a bit further and change serde
> hierarchy on the java side to something like this:
> 
> KeySerializer-↘
>KeySerde---↘
> KeyDeserializer---↗↓
>Serde
> ValueSerializer---↘↑
>ValueSerde-↗
> ValueDeserializer-↗
> 
> I understand that this is a bit too revolutionary, but I think this is the
> right direction.
> 
> Regarding KIP-616 and KIP-513 connection. It's hard to say at the current
> moment how should we implement these kips: together or separately. It looks
> like there is no consensus on the implementation details for these kips.
> 
> If we decided to create a new `KeySerde` and `ValueSerde` on the scala side
> (current plan without my proposal above), we could use their companions for
> default instances. In this case, it looks like we don't need to do KIP-616.
> But what about backward compatibility? What should we do with
> `org.apache.kafka.streams.scala.Serdes`? Deprecate it?
> 
> - Yuriy
> 
> On Sun, May 31, 2020 at 1:24 AM John Roesler  wrote:
> 
>> Hi Mykhailo,
>>
>> Wow, I really dropped the ball here. I have just looked over your KIP
>> again, and now I see how you don’t need to change every dsl method, only
>> Consumed, Materialized, etc.
>>
>> I think this would be a good addition. Yuriy has just proposed KIP-616 to
>> fix some other problems with the implicit serdes. I’m wondering if these
>> two kips have any joint opportunities we should consider, or if it’s better
>> to continue to consider them separately.
>>
>> Thanks,
>> John
>>
>> On Wed, Jan 22, 2020, at 16:18, Михаил Ерёменко wrote:
>>> Hi, John!
>>>
>>> Sorry for the late reply. I am not really familiar with this mail list
>>> discussions, so I have not seen your mails.
>>>
>>> Regarding your question:
   I guess what
>>>   I'm struggling with is why you actually want to have different key and
>>>   serdes for the same type
>>>
>>> I think good example will be (and it is actually what we do in ours
>>> project) using confluent schema registry in conjunction with kafka
>>> streams. Some models can be used as keys as well as values. When we
>>> define schema registry compatible serde, we have to specify is it for
>>> key or not. We can of course create two serdes for the same model, but
>>> in this case implicit semantic will not work because scala doesn’t know
>>> which implicit to pick. And things become even more complicated in case
>>> if you will try to derive your serdes (we derive serdes in ours
>>> project).
>>>
>>> One more thing:
 every method in the streams-scala DSL.
>>>
>>> So far we've just changed
>>> org.apache.kafka.streams.scala.ImplicitConversions and
>>> org.apache.kafka.streams.scala.kstream.Materialized and it works for
>>> us. Also we did introduce default serdes for primitive types.
>>>
>>> Regards,
>>> Mykhailo
>>
> 



signature.asc
Description: OpenPGP digital signature


Build failed in Jenkins: kafka-trunk-jdk14 #177

2020-06-04 Thread Apache Jenkins Server
See 

Changes:


--
Started by an SCM change
Running as SYSTEM
[EnvInject] - Loading node environment variables.
Building remotely on H23 (ubuntu) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
ERROR: Error cloning remote repo 'origin'
hudson.plugins.git.GitException: Could not init 

at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$5.execute(CliGitAPIImpl.java:916)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$2.execute(CliGitAPIImpl.java:708)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler$1.call(RemoteGitImpl.java:153)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler$1.call(RemoteGitImpl.java:146)
at hudson.remoting.UserRequest.perform(UserRequest.java:212)
at hudson.remoting.UserRequest.perform(UserRequest.java:54)
at hudson.remoting.Request$2.run(Request.java:369)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: hudson.remoting.Channel$CallSiteStackTrace: Remote call to 
H23
at 
hudson.remoting.Channel.attachCallSiteStackTrace(Channel.java:1743)
at 
hudson.remoting.UserRequest$ExceptionResponse.retrieve(UserRequest.java:357)
at hudson.remoting.Channel.call(Channel.java:957)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler.execute(RemoteGitImpl.java:146)
at sun.reflect.GeneratedMethodAccessor1196.invoke(Unknown 
Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler.invoke(RemoteGitImpl.java:132)
at com.sun.proxy.$Proxy137.execute(Unknown Source)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1152)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1192)
at hudson.scm.SCM.checkout(SCM.java:504)
at 
hudson.model.AbstractProject.checkout(AbstractProject.java:1208)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at 
jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1815)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at 
hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:429)
Caused by: hudson.plugins.git.GitException: Error performing git command
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2181)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2140)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2136)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommand(CliGitAPIImpl.java:1741)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$5.execute(CliGitAPIImpl.java:914)
... 11 more
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at hudson.Proc$LocalProc.(Proc.java:282)
at hudson.Proc$LocalProc.(Proc.java:219)
at hudson.Launcher$LocalLauncher.launch(Launcher.java:937)
at hudson.Launcher$ProcStarter.start(Launcher.java:455)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2168)
... 15 more
ERROR: Error cloning remote repo 'origin'
Retrying after 10 seconds
No credentials specified
 > git rev-parse --is-inside-work-tree # timeout=10
ERROR: Workspace has a .git repository, but it appears to be corrupt.
hudson.plugins.git.GitException: Error performing git command
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchC

Re: [DISCUSS] KIP-619 Deprecate ConsumerConfig#addDeserializerToConfig(Properties, Deserializer, Deserializer) and ProducerConfig#addSerializerToConfig(Properties, Serializer, Serializer)

2020-06-04 Thread Matthias J. Sax
Btw:

I think the KIP is quite straightforward and you could even skip the
DISCUSS and call for a VOTE directly.


-Matthias

On 6/4/20 4:40 PM, Matthias J. Sax wrote:
> @Chia-Ping
> 
> Can you maybe start a new DISCUSS thread using the new KIP number? This
> would help to keep the threads separated.
> 
> Thanks!
> 
> 
> -Matthias
> 
> On 6/3/20 6:56 AM, Chia-Ping Tsai wrote:
>> When I created the KIP, the next number was 619 and not sure why the number 
>> is out of sync.
>>
>> At any rate, I will update the KIP number :_
>>
>> On 2020/06/03 05:06:39, Cheng Tan  wrote: 
>>> Hi Chia, 
>>>
>>> Hope you are doing well. I already took KIP-619 as my KIP identification 
>>> number. Could you change your KIP id? Thank you.
>>>
>>> Best, - Cheng
>>>
 On May 31, 2020, at 8:08 PM, Chia-Ping Tsai  wrote:

 hi All,

 This KIP plans to deprecate two unused methods without replacement.

 All suggestions are welcome!

 KIP: 
 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=155749118
 ISSUE: https://issues.apache.org/jira/browse/KAFKA-10044

 ---
 Chia-Ping
>>>
>>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-619 Deprecate ConsumerConfig#addDeserializerToConfig(Properties, Deserializer, Deserializer) and ProducerConfig#addSerializerToConfig(Properties, Serializer, Serializer)

2020-06-04 Thread Matthias J. Sax
@Chia-Ping

Can you maybe start a new DISCUSS thread using the new KIP number? This
would help to keep the threads separated.

Thanks!


-Matthias

On 6/3/20 6:56 AM, Chia-Ping Tsai wrote:
> When I created the KIP, the next number was 619 and not sure why the number 
> is out of sync.
> 
> At any rate, I will update the KIP number :_
> 
> On 2020/06/03 05:06:39, Cheng Tan  wrote: 
>> Hi Chia, 
>>
>> Hope you are doing well. I already took KIP-619 as my KIP identification 
>> number. Could you change your KIP id? Thank you.
>>
>> Best, - Cheng
>>
>>> On May 31, 2020, at 8:08 PM, Chia-Ping Tsai  wrote:
>>>
>>> hi All,
>>>
>>> This KIP plans to deprecate two unused methods without replacement.
>>>
>>> All suggestions are welcome!
>>>
>>> KIP: 
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=155749118
>>> ISSUE: https://issues.apache.org/jira/browse/KAFKA-10044
>>>
>>> ---
>>> Chia-Ping
>>
>>



signature.asc
Description: OpenPGP digital signature


[jira] [Resolved] (KAFKA-10069) The user-defined "predicate" and "negate" are not removed from Transformation

2020-06-04 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-10069.

Resolution: Fixed

> The user-defined "predicate" and "negate" are not removed from Transformation
> -
>
> Key: KAFKA-10069
> URL: https://issues.apache.org/jira/browse/KAFKA-10069
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
> Fix For: 2.6.0
>
>
> There are official configDef for both "predicate" and "negate" so we should 
> remove user-defined configDef. However, current behavior does incorrect 
> comparison so the duplicate key will destroy the following embed configDef.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10104) Remove deprecated --zookeeper flags as specified in KIP-604

2020-06-04 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-10104:


 Summary: Remove deprecated --zookeeper flags as specified in 
KIP-604
 Key: KAFKA-10104
 URL: https://issues.apache.org/jira/browse/KAFKA-10104
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe


Remove deprecated --zookeeper flags as specified in KIP-604



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-06-04 Thread Boyang Chen
Hey there,

bumping this thread for any further KIP-590 discussion, since it's been
quiet for a while.

Boyang

On Thu, May 21, 2020 at 10:31 AM Boyang Chen 
wrote:

> Thanks David, I agree the wording here is not clear, and the fellow broker
> should just send a new CreateTopicRequest in this case.
>
> In the meantime, we had some offline discussion about the Envelope API as
> well. Although it provides certain privileges like data embedding and
> principal embedding, it creates a security hole by letting a malicious user
> impersonate any forwarding broker, thus pretending to be any admin user.
> Passing the principal around also increases the vulnerability, compared
> with other standard ways such as passing a verified token, but it is
> unfortunately not fully supported with Kafka security.
>
> So for the security concerns, we are abandoning the Envelope approach and
> fallback to just forward the raw admin requests. The authentication will
> happen on the receiving broker side instead of on the controller, so that
> we are able to stripped off the principal fields and only include the
> principal in header as optional field for audit logging purpose.
> Furthermore, we shall propose adding a separate endpoint for
> broker-controller communication which should be recommended to enable
> secure connections so that a malicious client could not pretend to be a
> broker and perform impersonation.
>
> Let me know your thoughts.
>
> Best,
> Boyang
>
> On Tue, May 19, 2020 at 12:17 AM David Jacot  wrote:
>
>> Hi Boyang,
>>
>> I've got another question regarding the auto topic creation case. The KIP
>> says: "Currently the target broker shall just utilize its own ZK client to
>> create
>> internal topics, which is disallowed in the bridge release. For above
>> scenarios,
>> non-controller broker shall just forward a CreateTopicRequest to the
>> controller
>> instead and let controller take care of the rest, while waiting for the
>> response
>> in the meantime." There will be no request to forward in this case, right?
>> Instead,
>> a CreateTopicsRequest is created and sent to the controller node.
>>
>> When the CreateTopicsRequest is sent as a side effect of the
>> MetadataRequest,
>> it would be good to know the principal and the clientId in the controller
>> (quota,
>> audit, etc.). Do you plan to use the Envelope API for this case as well or
>> to call
>> the regular API directly? Another was to phrase it would be: Shall the
>> internal
>> CreateTopicsRequest be sent with the original metadata (principal,
>> clientId, etc.)
>> of the MetadataRequest or as an admin request?
>>
>> Best,
>> David
>>
>> On Fri, May 8, 2020 at 2:04 AM Guozhang Wang  wrote:
>>
>> > Just to adds a bit more FYI here related to the last question from
>> David:
>> > in KIP-595 while implementing the new requests we are also adding a
>> > "KafkaNetworkChannel" which is used for brokers to send vote / fetch
>> > records, so besides the discussion on listeners I think implementation
>> wise
>> > we can also consider consolidating a lot of those into the same
>> call-trace
>> > as well -- of course this is not related to public APIs so maybe just
>> needs
>> > to be coordinated among developments:
>> >
>> > 1. Broker -> Controller: ISR Change, Topic Creation, Admin Redirect
>> > (KIP-497).
>> > 2. Controller -> Broker: LeaderAndISR / MetadataUpdate; though these are
>> > likely going to be deprecated post KIP-500.
>> > 3. Txn Coordinator -> Broker: TxnMarker
>> >
>> >
>> > Guozhang
>> >
>> > On Wed, May 6, 2020 at 8:58 PM Boyang Chen 
>> > wrote:
>> >
>> > > Hey David,
>> > >
>> > > thanks for the feedbacks!
>> > >
>> > > On Wed, May 6, 2020 at 2:06 AM David Jacot 
>> wrote:
>> > >
>> > > > Hi Boyang,
>> > > >
>> > > > While re-reading the KIP, I've got few small questions/comments:
>> > > >
>> > > > 1. When auto topic creation is enabled, brokers will send a
>> > > > CreateTopicRequest
>> > > > to the controller instead of writing to ZK directly. It means that
>> > > > creation of these
>> > > > topics are subject to be rejected with an error if a
>> CreateTopicPolicy
>> > is
>> > > > used. Today,
>> > > > it bypasses the policy entirely. I suppose that clusters allowing
>> auto
>> > > > topic creation
>> > > > don't have a policy in place so it is not a big deal. I suggest to
>> call
>> > > > out explicitly the
>> > > > limitation in the KIP though.
>> > > >
>> > > > That's a good idea, will add to the KIP.
>> > >
>> > >
>> > > > 2. In the same vein as my first point. How do you plan to handle
>> errors
>> > > > when internal
>> > > > topics are created by a broker? Do you plan to retry retryable
>> errors
>> > > > indefinitely?
>> > > >
>> > > > I checked a bit on the admin client handling of the create topic
>> RPC.
>> > It
>> > > seems that
>> > > the only retriable exceptions at the moment are NOT_CONTROLLER and
>> > > REQUEST_TIMEOUT.
>> > > So I guess we just need to retry on these exceptions?
>> > >
>> > >
>> > > > 3. Could you c

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-06-04 Thread Satish Duggana
Hi Jun,
Please let us know if you have any comments on "transactional support"
and "follower requests/replication" mentioned in the wiki.

Thanks,
Satish.

On Tue, Jun 2, 2020 at 9:25 PM Satish Duggana  wrote:
>
> Thanks Jun for your comments.
>
> >100. It would be useful to provide more details on how those apis are used. 
> >Otherwise, it's kind of hard to really assess whether the new apis are 
> >sufficient/redundant. A few examples below.
>
> We will update the wiki and let you know.
>
> >100.1 deleteRecords seems to only advance the logStartOffset in Log. How 
> >does that trigger the deletion of remote log segments?
>
> RLMTask for leader partition periodically checks whether there are
> remote log segments earlier to logStartOffset and the respective
> remote log segment metadata and data are deleted by using RLMM and
> RSM.
>
> >100.2 stopReplica with deletion is used in 2 cases (a) replica reassignment; 
> >(b) topic deletion. We only want to delete the tiered metadata in the second 
> >case. Also, in the second case, who initiates the deletion of the remote 
> >segment since the leader may not exist?
>
> Right, it is deleted only incase of topic deletion only. We will cover
> the details in the KIP.
>
> >100.3 "LogStartOffset of a topic can be either in local or in remote 
> >storage." If LogStartOffset exists in both places, which one is the source 
> >of truth?
>
> I meant the logStartOffset can point to either of local segment or
> remote segment but it is initialised and maintained in the Log class
> like now.
>
> >100.4 List listRemoteLogSegments(TopicPartition 
> >topicPartition, long minOffset): How is minOffset supposed to be used?
>
> Returns list of remote segments, sorted by baseOffset in ascending
> order that have baseOffset >= the given min Offset.
>
> >100.5 When copying a segment to remote storage, it seems we are calling the 
> >same RLMM.putRemoteLogSegmentData() twice before and after copyLogSegment(). 
> >Could you explain why?
>
> This is more about prepare/commit/rollback as you suggested.  We will
> update the wiki with the new APIs.
>
> >100.6 LogSegmentData includes leaderEpochCache, but there is no api in 
> >RemoteStorageManager to retrieve it.
>
> Nice catch, copy/paste issue. There is an API to retrieve it.
>
> >101. If the __remote_log_metadata is for production usage, could you provide 
> >more details? For example, what is the schema of the data (both key and 
> >value)? How is the topic maintained,delete or compact?
>
> It is with delete config and it’s retention period is suggested to be
> more than the remote retention period.
>
> >110. Is the cache implementation in RemoteLogMetadataManager meant for 
> >production usage? If so, could you provide more details on the schema and 
> >how/where the data is stored?
>
> The proposal is to have a cache (with default implementation backed by
> rocksdb) but it will be added in later versions. We will add this to
> future work items.
>
> >111. "Committed offsets can be stored in a local file". Could you describe 
> >the format of the file and where it's stored?
>
> We will cover this in the KIP.
>
> >112. Truncation of remote segments under unclean leader election: I am not 
> >sure who figures out the truncated remote segments and how that information 
> >is propagated to all replicas?
>
> We will add this in detail in the KIP.
>
> >113. "If there are any failures in removing remote log segments then those 
> >are stored in a specific topic (default as 
> >__remote_segments_to_be_deleted)". Is it necessary to add yet another 
> >internal topic? Could we just keep retrying?
>
> This is not really an internal topic, it will be exposed as a user
> configurable topic. After a few retries, we want user to know about
> the failure so that they can take an action later by consuming from
> this topic. We want to keep this simple instead of retrying
> continuously and maintaining the deletion state etc.
>
> >114. "We may not need to copy producer-id-snapshot as we are copying only 
> >segments earlier to last-stable-offset." Hmm, not sure about that. The 
> >producer snapshot includes things like the last timestamp of each open 
> >producer id and can affect when those producer ids are expired.
>
> Sure, this will be added as part of the LogSegmentData.
>
> Thanks,
> Satish.
>
>
> On Fri, May 29, 2020 at 6:39 AM Jun Rao  wrote:
> >
> > Hi, Satish,
> >
> > Made another pass on the wiki. A few more comments below.
> >
> > 100. It would be useful to provide more details on how those apis are used. 
> > Otherwise, it's kind of hard to really assess whether the new apis are 
> > sufficient/redundant. A few examples below.
> > 100.1 deleteRecords seems to only advance the logStartOffset in Log. How 
> > does that trigger the deletion of remote log segments?
> > 100.2 stopReplica with deletion is used in 2 cases (a) replica 
> > reassignment; (b) topic deletion. We only want to delete the tiered 
> > metadata in the second case. Also,

Jenkins build is back to normal : kafka-2.6-jdk8 #18

2020-06-04 Thread Apache Jenkins Server
See 




Re: Please I Want to Subscribe

2020-06-04 Thread Ricardo Ferreira

Thanks for the heads up!

Yeah, I saw the right email but ended up sending the subscribe request 
for the wrong one =)


Everything is fixed now.

Thanks,

-- Ricardo

On 6/4/20 12:40 PM, Guozhang Wang wrote:

Hello Ricardo,

Here you go for the guidelines, it is self-service:
https://kafka.apache.org/contact


Guozhang

On Wed, Jun 3, 2020 at 1:09 PM Ricardo Ferreira 
wrote:


Please I Want to Subscribe




[jira] [Created] (KAFKA-10103) JDBC Sink Connector doesn't support numerical values in event keys

2020-06-04 Thread Jakub (Jira)
Jakub created KAFKA-10103:
-

 Summary: JDBC Sink Connector doesn't support numerical values in 
event keys
 Key: KAFKA-10103
 URL: https://issues.apache.org/jira/browse/KAFKA-10103
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.1
Reporter: Jakub


Our topics contain events with numerical keys and Avro values. We're trying to 
configure a JDBC connector to export data from these topics to Oracle DB, but 
it doesn't seem to work.

If we use strings as keys everything works fine, but if we switch to Longs it 
stops working. We tried different values of _key.converter_, including 
_org.apache.kafka.connect.converters.ByteArrayConverter,_ but either parsing of 
keys doesn't work, or they cannot be mapped to Oracle data type (NUMBER - this 
happens if we use _ByteArrayConverter)._ 

We also tried using transformations (CAST), but in that case we're getting 
_Cast transformation does not support casting to/from BYTES_

Please excuse if this is not a bug and there is a way to work with numerical 
keys, we just couldn't find anything about that in the documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Please I Want to Subscribe

2020-06-04 Thread Guozhang Wang
Hello Ricardo,

Here you go for the guidelines, it is self-service:
https://kafka.apache.org/contact


Guozhang

On Wed, Jun 3, 2020 at 1:09 PM Ricardo Ferreira 
wrote:

> Please I Want to Subscribe
>
>

-- 
-- Guozhang


[jira] [Created] (KAFKA-10102) Source node references not updated after rebuilding topology

2020-06-04 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10102:
---

 Summary: Source node references not updated after rebuilding 
topology
 Key: KAFKA-10102
 URL: https://issues.apache.org/jira/browse/KAFKA-10102
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman
 Fix For: 2.6.0


Luckily this bug was caught by 
RegexSourceIntegrationTest#testRegexRecordsAreProcessedAfterReassignment – we 
saw it fail with an NPE during SourceNode#deserializeKey, implying that the key 
deserializer was null which in turns implies that the source node was never 
initialized.

This can happen when a task is updated with new regex matched topic partitions. 
In order to update the topology with the new input partitions, we actually just 
create an entirely new topology from scratch including building new source node 
objects. We then re/initialize this new topology once the task is resumed.

The problem is that the task's RecordQueues save a reference to the 
corresponding source node, and use this to pass polled records into the 
topology. But the RecordQueues aren't updated with the newly built source nodes 
and still point to the original nodes.

If the task had not completed restoration before being updated with new 
partitions, it would never have initialized the original topology or source 
nodes, resulting in an NPE when the RecordQueue passes a record to the old, 
uninitialized source node.

This is the only specific known bug, but I haven't checked the entire code base 
so it's possible there are other node references saved that might result in 
bugs. We should try and avoid rebuilding an entirely new topology if at all 
possible, and see if we can just update the input partitions only where 
necessary



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk14 #172

2020-06-04 Thread Apache Jenkins Server
See 

Changes:


--
Started by an SCM change
Running as SYSTEM
[EnvInject] - Loading node environment variables.
Building remotely on H23 (ubuntu) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
ERROR: Error cloning remote repo 'origin'
hudson.plugins.git.GitException: Could not init 

at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$5.execute(CliGitAPIImpl.java:916)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$2.execute(CliGitAPIImpl.java:708)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler$1.call(RemoteGitImpl.java:153)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler$1.call(RemoteGitImpl.java:146)
at hudson.remoting.UserRequest.perform(UserRequest.java:212)
at hudson.remoting.UserRequest.perform(UserRequest.java:54)
at hudson.remoting.Request$2.run(Request.java:369)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: hudson.remoting.Channel$CallSiteStackTrace: Remote call to 
H23
at 
hudson.remoting.Channel.attachCallSiteStackTrace(Channel.java:1743)
at 
hudson.remoting.UserRequest$ExceptionResponse.retrieve(UserRequest.java:357)
at hudson.remoting.Channel.call(Channel.java:957)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler.execute(RemoteGitImpl.java:146)
at sun.reflect.GeneratedMethodAccessor1196.invoke(Unknown 
Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler.invoke(RemoteGitImpl.java:132)
at com.sun.proxy.$Proxy137.execute(Unknown Source)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1152)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1192)
at hudson.scm.SCM.checkout(SCM.java:504)
at 
hudson.model.AbstractProject.checkout(AbstractProject.java:1208)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at 
jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1815)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at 
hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:429)
Caused by: hudson.plugins.git.GitException: Error performing git command
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2181)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2140)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2136)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommand(CliGitAPIImpl.java:1741)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$5.execute(CliGitAPIImpl.java:914)
... 11 more
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at 
java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
at 
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at hudson.Proc.joinWithTimeout(Proc.java:158)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2168)
... 15 more
ERROR: Error cloning remote repo 'origin'
Retrying after 10 seconds
No credentials specified
 > git rev-parse --is-inside-work-tree # timeout=10
ERROR: Workspace has a .git repository, but it appears to be corrupt.
hudson.plugins.git.GitE

[jira] [Resolved] (KAFKA-9576) Topic creation failure causing Stream thread death

2020-06-04 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9576.

Resolution: Duplicate

> Topic creation failure causing Stream thread death
> --
>
> Key: KAFKA-9576
> URL: https://issues.apache.org/jira/browse/KAFKA-9576
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Priority: Major
>
> The failure to create an internal topic could lead to the stream thread death 
> due to timeout:
> {code:java}
> [2020-02-14T03:03:00-08:00] 
> (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) [2020-02-14 
> 11:03:00,083] ERROR 
> [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] 
> stream-thread [main] Unexpected error during topic description for 
> stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog.
> [2020-02-14T03:03:00-08:00] 
> (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) [2020-02-14 
> 11:03:00,083] ERROR 
> [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] 
> stream-thread 
> [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] 
> Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-02-14T03:03:00-08:00] 
> (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) 
> org.apache.kafka.streams.errors.StreamsException: Could not create topic 
> stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog.
>         at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:209)
>         at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:223)
>         at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:106)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.p

[jira] [Created] (KAFKA-10101) recovery point is advanced without flushing the data after recovery

2020-06-04 Thread Jun Rao (Jira)
Jun Rao created KAFKA-10101:
---

 Summary: recovery point is advanced without flushing the data 
after recovery
 Key: KAFKA-10101
 URL: https://issues.apache.org/jira/browse/KAFKA-10101
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.5.0
Reporter: Jun Rao


Currently, in Log.recoverLog(), we set recoveryPoint to logEndOffset after 
recovering the log segment. However, we don't flush the log segments after 
recovery. The potential issue is that if the broker has another hard failure, 
segments may be corrupted on disk but won't be going through recovery on 
another restart.

This logic was introduced in KAFKA-5829 since 1.0.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-04 Thread Mickael Maison
+1 (binding)
Thanks David for looking into this important issue

On Thu, Jun 4, 2020 at 3:59 PM Tom Bentley  wrote:
>
> +1 (non binding).
>
> Thanks!
>
> On Wed, Jun 3, 2020 at 3:51 PM Rajini Sivaram 
> wrote:
>
> > +1 (binding)
> >
> > Thanks for the KIP, David!
> >
> > Regards,
> >
> > Rajini
> >
> >
> > On Sun, May 31, 2020 at 3:29 AM Gwen Shapira  wrote:
> >
> > > +1 (binding)
> > >
> > > Looks great. Thank you for the in-depth design and discussion.
> > >
> > > On Fri, May 29, 2020 at 7:58 AM David Jacot  wrote:
> > >
> > > > Hi folks,
> > > >
> > > > I'd like to start the vote for KIP-599 which proposes a new quota to
> > > > throttle create topic, create partition, and delete topics operations
> > to
> > > > protect the Kafka controller:
> > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-599%3A+Throttle+Create+Topic%2C+Create+Partition+and+Delete+Topic+Operations
> > > >
> > > > Please, let me know what you think.
> > > >
> > > > Cheers,
> > > > David
> > > >
> > >
> > >
> > > --
> > > Gwen Shapira
> > > Engineering Manager | Confluent
> > > 650.450.2760 | @gwenshap
> > > Follow us: Twitter | blog
> > >
> >


Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-04 Thread David Jacot
Hi all,

I just published an updated version of the KIP which includes:
* Using a slightly modified version of our Rate. I have tried to formalize
it based on our discussion. As Anna suggested, we may find a better way to
implement it.
* Handling of ValidateOnly as pointed out by Tom.

Please, check it out and let me know what you think.

Best,
David

On Thu, Jun 4, 2020 at 4:57 PM Tom Bentley  wrote:

> Hi David,
>
> As a user I might expect the validateOnly option to do everything except
> actually make the changes. That interpretation would imply the quota should
> be checked, but the check should obviously be side-effect free. I think
> this interpretation could be useful because it gives the caller either some
> confidence that they're not going to hit the quota, or tell them, via the
> exception, when they can expect the call to work. But for this to be useful
> it would require the retry logic to not retry the request when validateOnly
> was set.
>
> On the other hand, if validateOnly is really about validating only some
> aspects of the request (which maybe is what the name implies), then we
> should clarify in the Javadoc that the quota is not included in the
> validation.
>
> On balance, I agree with what you're proposing, since the extra utility of
> including the quota in the validation seems to be not worth the extra
> complication for the retry.
>
> Thanks,
>
> Tom
>
>
>
> On Thu, Jun 4, 2020 at 3:32 PM David Jacot  wrote:
>
> > Hi Tom,
> >
> > That's a good question. As the validation does not create any load on the
> > controller, I was planning to do it without checking the quota at all.
> Does
> > that
> > sound reasonable?
> >
> > Best,
> > David
> >
> > On Thu, Jun 4, 2020 at 4:23 PM David Jacot  wrote:
> >
> > > Hi Jun and Anna,
> > >
> > > Thank you both for your replies.
> > >
> > > Based on our recent discussion, I agree that using a rate is better to
> > > remain
> > > consistent with other quotas. As you both suggested, it seems that
> > changing
> > > the way we compute the rate to better handle spiky workloads and
> behave a
> > > bit more similarly to the token bucket algorithm makes sense for all
> > > quotas as
> > > well.
> > >
> > > I will update the KIP to reflect this.
> > >
> > > Anna, I think that we can explain this in this KIP. We can't just say
> > that
> > > the Rate
> > > will be updated in this KIP. I think that we need to give a bit more
> > info.
> > >
> > > Best,
> > > David
> > >
> > > On Thu, Jun 4, 2020 at 6:31 AM Anna Povzner  wrote:
> > >
> > >> Hi Jun and David,
> > >>
> > >> Regarding token bucket vs, Rate behavior. We recently observed a
> couple
> > of
> > >> cases where a bursty workload behavior would result in long-ish pauses
> > in
> > >> between, resulting in lower overall bandwidth than the quota. I will
> > need
> > >> to debug this a bit more to be 100% sure, but it does look like the
> case
> > >> described by David earlier in this thread. So, I agree with Jun -- I
> > think
> > >> we should make all quota rate behavior consistent, and probably
> similar
> > to
> > >> the token bucket one.
> > >>
> > >> Looking at KIP-13, it doesn't describe rate calculation in enough
> > detail,
> > >> but does mention window size. So, we could keep "window size" and
> > "number
> > >> of samples" configs and change Rate implementation to be more similar
> to
> > >> token bucket:
> > >> * number of samples define our burst size
> > >> * Change the behavior, which could be described as: If a burst happens
> > >> after an idle period, the burst would effectively spread evenly over
> the
> > >> (now - window) time period, where window is ( - 1)*
> > >> . Which basically describes a token bucket, while keeping
> > the
> > >> current quota configs. I think we can even implement this by changing
> > the
> > >> way we record the last sample or lastWindowMs.
> > >>
> > >> Jun, if we would be changing Rate calculation behavior in bandwidth
> and
> > >> request quotas, would we need a separate KIP? Shouldn't need to if we
> > >> keep window size and number of samples configs, right?
> > >>
> > >> Thanks,
> > >> Anna
> > >>
> > >> On Wed, Jun 3, 2020 at 3:24 PM Jun Rao  wrote:
> > >>
> > >> > Hi, David,
> > >> >
> > >> > Thanks for the reply.
> > >> >
> > >> > 11. To match the behavior in the Token bucket approach, I was
> thinking
> > >> that
> > >> > requests that don't fit in the previous time windows will be
> > >> accumulated in
> > >> > the current time window. So, the 60 extra requests will be
> accumulated
> > >> in
> > >> > the latest window. Then, the client also has to wait for 12 more
> secs
> > >> > before throttling is removed. I agree that this is probably a better
> > >> > behavior and it's reasonable to change the existing behavior to this
> > >> one.
> > >> >
> > >> > To me, it seems that sample_size * num_windows is the same as max
> > burst
> > >> > balance. The latter seems a bit better to configure. The thing is
> that
> > >> the
> > >> > existing 

Re: [VOTE] KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-04 Thread Tom Bentley
+1 (non binding).

Thanks!

On Wed, Jun 3, 2020 at 3:51 PM Rajini Sivaram 
wrote:

> +1 (binding)
>
> Thanks for the KIP, David!
>
> Regards,
>
> Rajini
>
>
> On Sun, May 31, 2020 at 3:29 AM Gwen Shapira  wrote:
>
> > +1 (binding)
> >
> > Looks great. Thank you for the in-depth design and discussion.
> >
> > On Fri, May 29, 2020 at 7:58 AM David Jacot  wrote:
> >
> > > Hi folks,
> > >
> > > I'd like to start the vote for KIP-599 which proposes a new quota to
> > > throttle create topic, create partition, and delete topics operations
> to
> > > protect the Kafka controller:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-599%3A+Throttle+Create+Topic%2C+Create+Partition+and+Delete+Topic+Operations
> > >
> > > Please, let me know what you think.
> > >
> > > Cheers,
> > > David
> > >
> >
> >
> > --
> > Gwen Shapira
> > Engineering Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter | blog
> >
>


Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-04 Thread Tom Bentley
Hi David,

As a user I might expect the validateOnly option to do everything except
actually make the changes. That interpretation would imply the quota should
be checked, but the check should obviously be side-effect free. I think
this interpretation could be useful because it gives the caller either some
confidence that they're not going to hit the quota, or tell them, via the
exception, when they can expect the call to work. But for this to be useful
it would require the retry logic to not retry the request when validateOnly
was set.

On the other hand, if validateOnly is really about validating only some
aspects of the request (which maybe is what the name implies), then we
should clarify in the Javadoc that the quota is not included in the
validation.

On balance, I agree with what you're proposing, since the extra utility of
including the quota in the validation seems to be not worth the extra
complication for the retry.

Thanks,

Tom



On Thu, Jun 4, 2020 at 3:32 PM David Jacot  wrote:

> Hi Tom,
>
> That's a good question. As the validation does not create any load on the
> controller, I was planning to do it without checking the quota at all. Does
> that
> sound reasonable?
>
> Best,
> David
>
> On Thu, Jun 4, 2020 at 4:23 PM David Jacot  wrote:
>
> > Hi Jun and Anna,
> >
> > Thank you both for your replies.
> >
> > Based on our recent discussion, I agree that using a rate is better to
> > remain
> > consistent with other quotas. As you both suggested, it seems that
> changing
> > the way we compute the rate to better handle spiky workloads and behave a
> > bit more similarly to the token bucket algorithm makes sense for all
> > quotas as
> > well.
> >
> > I will update the KIP to reflect this.
> >
> > Anna, I think that we can explain this in this KIP. We can't just say
> that
> > the Rate
> > will be updated in this KIP. I think that we need to give a bit more
> info.
> >
> > Best,
> > David
> >
> > On Thu, Jun 4, 2020 at 6:31 AM Anna Povzner  wrote:
> >
> >> Hi Jun and David,
> >>
> >> Regarding token bucket vs, Rate behavior. We recently observed a couple
> of
> >> cases where a bursty workload behavior would result in long-ish pauses
> in
> >> between, resulting in lower overall bandwidth than the quota. I will
> need
> >> to debug this a bit more to be 100% sure, but it does look like the case
> >> described by David earlier in this thread. So, I agree with Jun -- I
> think
> >> we should make all quota rate behavior consistent, and probably similar
> to
> >> the token bucket one.
> >>
> >> Looking at KIP-13, it doesn't describe rate calculation in enough
> detail,
> >> but does mention window size. So, we could keep "window size" and
> "number
> >> of samples" configs and change Rate implementation to be more similar to
> >> token bucket:
> >> * number of samples define our burst size
> >> * Change the behavior, which could be described as: If a burst happens
> >> after an idle period, the burst would effectively spread evenly over the
> >> (now - window) time period, where window is ( - 1)*
> >> . Which basically describes a token bucket, while keeping
> the
> >> current quota configs. I think we can even implement this by changing
> the
> >> way we record the last sample or lastWindowMs.
> >>
> >> Jun, if we would be changing Rate calculation behavior in bandwidth and
> >> request quotas, would we need a separate KIP? Shouldn't need to if we
> >> keep window size and number of samples configs, right?
> >>
> >> Thanks,
> >> Anna
> >>
> >> On Wed, Jun 3, 2020 at 3:24 PM Jun Rao  wrote:
> >>
> >> > Hi, David,
> >> >
> >> > Thanks for the reply.
> >> >
> >> > 11. To match the behavior in the Token bucket approach, I was thinking
> >> that
> >> > requests that don't fit in the previous time windows will be
> >> accumulated in
> >> > the current time window. So, the 60 extra requests will be accumulated
> >> in
> >> > the latest window. Then, the client also has to wait for 12 more secs
> >> > before throttling is removed. I agree that this is probably a better
> >> > behavior and it's reasonable to change the existing behavior to this
> >> one.
> >> >
> >> > To me, it seems that sample_size * num_windows is the same as max
> burst
> >> > balance. The latter seems a bit better to configure. The thing is that
> >> the
> >> > existing quota system has already been used in quite a few places and
> >> if we
> >> > want to change the configuration in the future, there is the migration
> >> > cost. Given that, do you feel it's better to adopt the  new token
> bucket
> >> > terminology or just adopt the behavior somehow into our existing
> >> system? If
> >> > it's the former, it would be useful to document this in the rejected
> >> > section and add a future plan on migrating existing quota
> >> configurations.
> >> >
> >> > Jun
> >> >
> >> >
> >> > On Tue, Jun 2, 2020 at 6:55 AM David Jacot 
> wrote:
> >> >
> >> > > Hi Jun,
> >> > >
> >> > > Thanks for your reply.
> >> > >
> >> > > 10. I think that

Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-04 Thread David Jacot
Hi Tom,

That's a good question. As the validation does not create any load on the
controller, I was planning to do it without checking the quota at all. Does
that
sound reasonable?

Best,
David

On Thu, Jun 4, 2020 at 4:23 PM David Jacot  wrote:

> Hi Jun and Anna,
>
> Thank you both for your replies.
>
> Based on our recent discussion, I agree that using a rate is better to
> remain
> consistent with other quotas. As you both suggested, it seems that changing
> the way we compute the rate to better handle spiky workloads and behave a
> bit more similarly to the token bucket algorithm makes sense for all
> quotas as
> well.
>
> I will update the KIP to reflect this.
>
> Anna, I think that we can explain this in this KIP. We can't just say that
> the Rate
> will be updated in this KIP. I think that we need to give a bit more info.
>
> Best,
> David
>
> On Thu, Jun 4, 2020 at 6:31 AM Anna Povzner  wrote:
>
>> Hi Jun and David,
>>
>> Regarding token bucket vs, Rate behavior. We recently observed a couple of
>> cases where a bursty workload behavior would result in long-ish pauses in
>> between, resulting in lower overall bandwidth than the quota. I will need
>> to debug this a bit more to be 100% sure, but it does look like the case
>> described by David earlier in this thread. So, I agree with Jun -- I think
>> we should make all quota rate behavior consistent, and probably similar to
>> the token bucket one.
>>
>> Looking at KIP-13, it doesn't describe rate calculation in enough detail,
>> but does mention window size. So, we could keep "window size" and "number
>> of samples" configs and change Rate implementation to be more similar to
>> token bucket:
>> * number of samples define our burst size
>> * Change the behavior, which could be described as: If a burst happens
>> after an idle period, the burst would effectively spread evenly over the
>> (now - window) time period, where window is ( - 1)*
>> . Which basically describes a token bucket, while keeping the
>> current quota configs. I think we can even implement this by changing the
>> way we record the last sample or lastWindowMs.
>>
>> Jun, if we would be changing Rate calculation behavior in bandwidth and
>> request quotas, would we need a separate KIP? Shouldn't need to if we
>> keep window size and number of samples configs, right?
>>
>> Thanks,
>> Anna
>>
>> On Wed, Jun 3, 2020 at 3:24 PM Jun Rao  wrote:
>>
>> > Hi, David,
>> >
>> > Thanks for the reply.
>> >
>> > 11. To match the behavior in the Token bucket approach, I was thinking
>> that
>> > requests that don't fit in the previous time windows will be
>> accumulated in
>> > the current time window. So, the 60 extra requests will be accumulated
>> in
>> > the latest window. Then, the client also has to wait for 12 more secs
>> > before throttling is removed. I agree that this is probably a better
>> > behavior and it's reasonable to change the existing behavior to this
>> one.
>> >
>> > To me, it seems that sample_size * num_windows is the same as max burst
>> > balance. The latter seems a bit better to configure. The thing is that
>> the
>> > existing quota system has already been used in quite a few places and
>> if we
>> > want to change the configuration in the future, there is the migration
>> > cost. Given that, do you feel it's better to adopt the  new token bucket
>> > terminology or just adopt the behavior somehow into our existing
>> system? If
>> > it's the former, it would be useful to document this in the rejected
>> > section and add a future plan on migrating existing quota
>> configurations.
>> >
>> > Jun
>> >
>> >
>> > On Tue, Jun 2, 2020 at 6:55 AM David Jacot  wrote:
>> >
>> > > Hi Jun,
>> > >
>> > > Thanks for your reply.
>> > >
>> > > 10. I think that both options are likely equivalent from an accuracy
>> > point
>> > > of
>> > > view. If we put the implementation aside, conceptually, I am not
>> > convinced
>> > > by the used based approach because resources don't have a clear owner
>> > > in AK at the moment. A topic can be created by (Principal A, no client
>> > id),
>> > > then can be extended by (no principal, Client B), and finally deleted
>> by
>> > > (Principal C, Client C). This does not sound right to me and I fear
>> that
>> > it
>> > > is not going to be easy to grasp for our users.
>> > >
>> > > Regarding the naming, I do agree that we can make it more future
>> proof.
>> > > I propose `controller_mutations_rate`. I think that differentiating
>> the
>> > > mutations
>> > > from the reads is still a good thing for the future.
>> > >
>> > > 11. I am not convinced by your proposal for the following reasons:
>> > >
>> > > First, in my toy example, I used 101 windows and 7 * 80 requests. We
>> > could
>> > > effectively allocate 5 * 100 requests to the previous windows assuming
>> > that
>> > > they are empty. What shall we do with the remaining 60 requests?
>> Shall we
>> > > allocate them to the current window or shall we divide it among all
>> the
>> > > windo

Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-04 Thread David Jacot
Hi Jun and Anna,

Thank you both for your replies.

Based on our recent discussion, I agree that using a rate is better to
remain
consistent with other quotas. As you both suggested, it seems that changing
the way we compute the rate to better handle spiky workloads and behave a
bit more similarly to the token bucket algorithm makes sense for all quotas
as
well.

I will update the KIP to reflect this.

Anna, I think that we can explain this in this KIP. We can't just say that
the Rate
will be updated in this KIP. I think that we need to give a bit more info.

Best,
David

On Thu, Jun 4, 2020 at 6:31 AM Anna Povzner  wrote:

> Hi Jun and David,
>
> Regarding token bucket vs, Rate behavior. We recently observed a couple of
> cases where a bursty workload behavior would result in long-ish pauses in
> between, resulting in lower overall bandwidth than the quota. I will need
> to debug this a bit more to be 100% sure, but it does look like the case
> described by David earlier in this thread. So, I agree with Jun -- I think
> we should make all quota rate behavior consistent, and probably similar to
> the token bucket one.
>
> Looking at KIP-13, it doesn't describe rate calculation in enough detail,
> but does mention window size. So, we could keep "window size" and "number
> of samples" configs and change Rate implementation to be more similar to
> token bucket:
> * number of samples define our burst size
> * Change the behavior, which could be described as: If a burst happens
> after an idle period, the burst would effectively spread evenly over the
> (now - window) time period, where window is ( - 1)*
> . Which basically describes a token bucket, while keeping the
> current quota configs. I think we can even implement this by changing the
> way we record the last sample or lastWindowMs.
>
> Jun, if we would be changing Rate calculation behavior in bandwidth and
> request quotas, would we need a separate KIP? Shouldn't need to if we
> keep window size and number of samples configs, right?
>
> Thanks,
> Anna
>
> On Wed, Jun 3, 2020 at 3:24 PM Jun Rao  wrote:
>
> > Hi, David,
> >
> > Thanks for the reply.
> >
> > 11. To match the behavior in the Token bucket approach, I was thinking
> that
> > requests that don't fit in the previous time windows will be accumulated
> in
> > the current time window. So, the 60 extra requests will be accumulated in
> > the latest window. Then, the client also has to wait for 12 more secs
> > before throttling is removed. I agree that this is probably a better
> > behavior and it's reasonable to change the existing behavior to this one.
> >
> > To me, it seems that sample_size * num_windows is the same as max burst
> > balance. The latter seems a bit better to configure. The thing is that
> the
> > existing quota system has already been used in quite a few places and if
> we
> > want to change the configuration in the future, there is the migration
> > cost. Given that, do you feel it's better to adopt the  new token bucket
> > terminology or just adopt the behavior somehow into our existing system?
> If
> > it's the former, it would be useful to document this in the rejected
> > section and add a future plan on migrating existing quota configurations.
> >
> > Jun
> >
> >
> > On Tue, Jun 2, 2020 at 6:55 AM David Jacot  wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks for your reply.
> > >
> > > 10. I think that both options are likely equivalent from an accuracy
> > point
> > > of
> > > view. If we put the implementation aside, conceptually, I am not
> > convinced
> > > by the used based approach because resources don't have a clear owner
> > > in AK at the moment. A topic can be created by (Principal A, no client
> > id),
> > > then can be extended by (no principal, Client B), and finally deleted
> by
> > > (Principal C, Client C). This does not sound right to me and I fear
> that
> > it
> > > is not going to be easy to grasp for our users.
> > >
> > > Regarding the naming, I do agree that we can make it more future proof.
> > > I propose `controller_mutations_rate`. I think that differentiating the
> > > mutations
> > > from the reads is still a good thing for the future.
> > >
> > > 11. I am not convinced by your proposal for the following reasons:
> > >
> > > First, in my toy example, I used 101 windows and 7 * 80 requests. We
> > could
> > > effectively allocate 5 * 100 requests to the previous windows assuming
> > that
> > > they are empty. What shall we do with the remaining 60 requests? Shall
> we
> > > allocate them to the current window or shall we divide it among all the
> > > windows?
> > >
> > > Second, I don't think that we can safely change the behavior of all the
> > > existing
> > > rates used because it actually changes the computation of the rate as
> > > values
> > > allocated to past windows would expire before they would today.
> > >
> > > Overall, while trying to fit in the current rate, we are going to
> build a
> > > slightly
> > > different version of the rate

[jira] [Created] (KAFKA-10100) LiveLeaders field in LeaderAndIsrRequest is not used anymore

2020-06-04 Thread David Jacot (Jira)
David Jacot created KAFKA-10100:
---

 Summary: LiveLeaders field in LeaderAndIsrRequest is not used 
anymore
 Key: KAFKA-10100
 URL: https://issues.apache.org/jira/browse/KAFKA-10100
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot
Assignee: David Jacot


We have noticed that the `LiveLeaders` field in the LeaderAndIsrRequest is not 
used anywhere but still populated by the controller.

It seems that that field was introduced in AK `0.8.0` and was supposed to be 
removed in AK `0.8.1`: 
[https://github.com/apache/kafka/blob/0.8.0/core/src/main/scala/kafka/cluster/Partition.scala#L194.]

I think that we can safely deprecate the field and stop populating it for all 
versions > 0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk14 #169

2020-06-04 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10069: Correctly remove user-defined "predicate" and "negate"


--
[...truncated 3.15 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apach