Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #386

2021-08-03 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 488710 lines...]
[2021-08-04T02:34:00.121Z] ApiVersionsRequestTest > 
testApiVersionsRequestValidationV0() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestValidationV0()[1] 
STARTED
[2021-08-04T02:34:02.748Z] 
[2021-08-04T02:34:02.748Z] ApiVersionsRequestTest > 
testApiVersionsRequestValidationV0() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestValidationV0()[1] 
PASSED
[2021-08-04T02:34:02.748Z] 
[2021-08-04T02:34:02.748Z] ApiVersionsRequestTest > 
testApiVersionsRequestValidationV3() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestValidationV3()[1] 
STARTED
[2021-08-04T02:34:04.496Z] 
[2021-08-04T02:34:04.496Z] ApiVersionsRequestTest > 
testApiVersionsRequestValidationV3() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestValidationV3()[1] 
PASSED
[2021-08-04T02:34:04.496Z] 
[2021-08-04T02:34:04.496Z] ApiVersionsRequestTest > 
testApiVersionsRequestThroughControlPlaneListener() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestThroughControlPlaneListener()[1]
 STARTED
[2021-08-04T02:34:07.119Z] 
[2021-08-04T02:34:07.119Z] ApiVersionsRequestTest > 
testApiVersionsRequestThroughControlPlaneListener() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestThroughControlPlaneListener()[1]
 PASSED
[2021-08-04T02:34:07.119Z] 
[2021-08-04T02:34:07.119Z] ApiVersionsRequestTest > testApiVersionsRequest() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequest()[1] STARTED
[2021-08-04T02:34:10.689Z] 
[2021-08-04T02:34:10.689Z] ApiVersionsRequestTest > testApiVersionsRequest() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequest()[1] PASSED
[2021-08-04T02:34:10.689Z] 
[2021-08-04T02:34:10.689Z] ApiVersionsRequestTest > 
testApiVersionsRequestValidationV0ThroughControlPlaneListener() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestValidationV0ThroughControlPlaneListener()[1]
 STARTED
[2021-08-04T02:34:12.439Z] 
[2021-08-04T02:34:12.439Z] ApiVersionsRequestTest > 
testApiVersionsRequestValidationV0ThroughControlPlaneListener() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestValidationV0ThroughControlPlaneListener()[1]
 PASSED
[2021-08-04T02:34:12.439Z] 
[2021-08-04T02:34:12.439Z] LogDirFailureTest > testIOExceptionDuringLogRoll() 
STARTED
[2021-08-04T02:34:19.434Z] 
[2021-08-04T02:34:19.434Z] LogDirFailureTest > testIOExceptionDuringLogRoll() 
PASSED
[2021-08-04T02:34:19.434Z] 
[2021-08-04T02:34:19.435Z] LogDirFailureTest > 
testIOExceptionDuringCheckpoint() STARTED
[2021-08-04T02:34:25.172Z] 
[2021-08-04T02:34:25.172Z] LogDirFailureTest > 
testIOExceptionDuringCheckpoint() PASSED
[2021-08-04T02:34:25.172Z] 
[2021-08-04T02:34:25.172Z] LogDirFailureTest > 
testProduceErrorFromFailureOnCheckpoint() STARTED
[2021-08-04T02:34:30.910Z] 
[2021-08-04T02:34:30.910Z] LogDirFailureTest > 
testProduceErrorFromFailureOnCheckpoint() PASSED
[2021-08-04T02:34:30.910Z] 
[2021-08-04T02:34:30.910Z] LogDirFailureTest > 
brokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure() STARTED
[2021-08-04T02:34:38.018Z] 
[2021-08-04T02:34:38.018Z] LogDirFailureTest > 
brokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure() PASSED
[2021-08-04T02:34:38.018Z] 
[2021-08-04T02:34:38.018Z] LogDirFailureTest > 
testReplicaFetcherThreadAfterLogDirFailureOnFollower() STARTED
[2021-08-04T02:34:43.755Z] 
[2021-08-04T02:34:43.756Z] LogDirFailureTest > 
testReplicaFetcherThreadAfterLogDirFailureOnFollower() PASSED
[2021-08-04T02:34:43.756Z] 
[2021-08-04T02:34:43.756Z] LogDirFailureTest > 
testProduceErrorFromFailureOnLogRoll() STARTED
[2021-08-04T02:34:47.325Z] 
[2021-08-04T02:34:47.325Z] LogDirFailureTest > 
testProduceErrorFromFailureOnLogRoll() PASSED
[2021-08-04T02:34:47.325Z] 
[2021-08-04T02:34:47.325Z] LogOffsetTest > 
testFetchOffsetsBeforeWithChangingSegmentSize() STARTED
[2021-08-04T02:34:51.019Z] 
[2021-08-04T02:34:51.019Z] LogOffsetTest > 
testFetchOffsetsBeforeWithChangingSegmentSize() PASSED
[2021-08-04T02:34:51.019Z] 
[2021-08-04T02:34:51.019Z] LogOffsetTest > testGetOffsetsBeforeEarliestTime() 
STARTED
[2021-08-04T02:34:53.813Z] 
[2021-08-04T02:34:53.813Z] LogOffsetTest > testGetOffsetsBeforeEarliestTime() 
PASSED
[2021-08-04T02:34:53.813Z] 
[2021-08-04T02:34:53.813Z] LogOffsetTest > 
testFetchOffsetByTimestampForMaxTimestampAfterTruncate() STARTED
[2021-08-04T02:34:58.927Z] 
[2021-08-04T02:34:58.927Z] LogOffsetTest > 
testFetchOffsetByTimestampForMaxTimestampAfterTruncate() PASSED
[2021-08-04T02:34:58.927Z] 
[2021-08-04T02:34:58.927Z] LogOffsetTest > 
testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps() STARTED
[2021-08-04T02:35:01.556Z] 
[2021-08-04T02:35:01.556Z] LogOffsetTest > 
testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps() PASSED
[2021-08-04T02:35:01.556Z] 
[2021-08-04T02:35:01.556Z] LogOffsetTest > testGetOffsetsForUnknownTopic() 
START

Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-08-03 Thread Matthias J. Sax
I was playing with the code a little bit, but it seems not to be easy to
use generics to enforce that V is `Comparable`...

We would need to introduce a new interface

 interface ComparableStream>
extends KStream
 {
KTable min();
 }

But it also requires a nasty cast to actually use it:

  KStream stream =
new StreamsBuilder().stream("");
  KTable table =
((ComparableStream) stream).min();

If the value-type does not implement `Comparable` the cast would not
compile... Or would there be a simpler way to ensure that min() can only
be called _if_ V is `Comparable`?


So maybe passing in a `Comparator` might be the right way to go;
might also be more flexible anyway. -- My original idea was just to
maybe avoid the `Comparator` argument, as it would make the API nicer
IMHO; fewer parameters is usually better...


I am not sure why we would want to pass `Function>
func` into `min()`?



-Matthias



On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
> Alex,
> 
> 
> On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil 
> wrote:
> 
>> Mohan / Mathias,
>>
 I think extending min/max to non-numeric types makes sense. Wondering
 why we should require a `Comparator` or if we should require that the
 types implement `Comparable` instead?

>>> Good question. This is what it would look like:
>>>
>>> KTable min_comparable()
>>> KTable min_comparator(Comparator comp)
>>
>> Not sure if I understood Mathias' proposal correctly, but I think that
>> instead of going with
>> your original proposal ( KTable min(Function> VR> func...)
>> or mine (KTable min(Comparator comparator...), we could simplify
>> it a
>> bit by using a function to extract a Comparable property from the original
>> value:
>>
>> KTable min(Function> func...)
>>
>> I will let Matthias clarify. I am not sure why it is simpler than the
> comparator one. Comparable is implemented by the type and not sure exposing
> it this way makes it any better.
> 
>> I also think, that min/max should not change the value type. Using
>>> `Long` for sum() make sense though, and also to require a `>> Number>`.
>>
>> Are there any reasons to limit the sum() to integers? Why not use a Double
>> instead?
>>
>> Yeah, if the precision is important, then we should stick with Double.
> 
> -mohan
> 
> Best regards,
>> Alexandre
>>
>> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy 
>> wrote:
>>
>>> Matthias,
>>>
>>> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
>> >>>
>>> wrote:
>>>
 Hi,

 I think extending min/max to non-numeric types makes sense. Wondering
 why we should require a `Comparator` or if we should require that the
 types implement `Comparable` instead?

 Good question. This is what it would look like:
>>>
>>> KTable min_comparable()
>>> KTable min_comparator(Comparator comp)
>>>
>>> For min_comparable to work, you still need the bounds "V extends
>>> Comparable<
>>> V>". AFAICT, to avoid the "type parameter V hiding the type V" warning,
>> it
>>> has to be at the interface level like this:
>>>
>>>  KStream>
>>>
>>> which is a little messy unless there is a different way to do the same.
>> The
>>> comparator gives a simple way to define an anonymous function.
>>>
>>> What do you think ?
>>>
>>>
 I also think, that min/max should not change the value type. Using
 `Long` for sum() make sense though, and also to require a `>>> Number>`.

 I guess these are the two possibilities:
>>>
>>>  Long sum(Function func)
>>> Long sum(Function func)
>>>
>>> Both should work. "func" can return any subtypes of Number and I don't
>> see
>>> any advantages with the first version. Can you clarify ?
>>>
>>> Thanks
>>> Mohan
>>>
>>>

 -Matthias

 On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
> Hi Alex,
>
> On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <
 alexandre.bra...@gmail.com>
> wrote:
>
>>
>> My point here is that, when we're only interested in a max/min
>> numeric
>> value, it doesn't
>> matter when we have repeated values, since we'd be only forwarding
>> the
>> number downstream,
>> so I could disregard when the Comparator returns a zero value
>> (meaning
>> equals) and min/max
>> would still be semantically correct. But when we're forwarding the
 original
>> object downstream
>> instead of its numeric property, it could mean different things
>> semantically depending on how
>> we handle the repeated values.
>>
>> As an example, if I were using max() on a stream of Biddings for
 products
>> in an auction, the
>> order of the biddings would probably influence the winner if two
>>> clients
>> send Biddings with the
>> same value. If we're only forwarding the Bidding value downstream (a
 double
>> value of 100, for
>> example), it doesn't matter how repeated values are handled, since
>> the
 max
>> price for this
>> auction would still be 100.00, no matter what Bidding g

[jira] [Resolved] (KAFKA-13145) Renaming the time interval window for better understanding

2021-08-03 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13145.
---
Resolution: Won't Fix

> Renaming the time interval window for better understanding
> --
>
> Key: KAFKA-13145
> URL: https://issues.apache.org/jira/browse/KAFKA-13145
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
>  
> I have another thought, which is to rename the time interval related windows. 
> Currently, we have 3 types of time interval window:
>  {{TimeWindow}} -> to have {{[start,end)}} time interval
>  {{SessionWindow}} -> to have {{[start,end]}} time interval
>  {{UnlimitedWindow}} -> to have {{[start, MAX_VALUE)}} time interval
> I think the name {{SessionWindow}} is definitely not good here, especially we 
> want to use it in {{SlidingWindows}} now, although it is only used for 
> {{SessionWindows}} before. We should name them with time interval meaning, 
> not the streaming window functions meaning. {{}}Because these 3 window types 
> are internal use only, it is safe to rename them.
>  
> {{TimeWindow}} --> {{InclusiveExclusiveWindow}}
>  {{SessionWindow}} / {{SlidingWindow}} --> {{InclusiveInclusiveWindow}}
>  {{UnlimitedWindow}} --> {{InclusiveUnboundedWindow}}
> {{}}
> See the discussion here{{: 
> [https://github.com/apache/kafka/pull/11124#issuecomment-887989639]}}
> {{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-03 Thread Sophie Blee-Goldman
Do we really need a whole DSL operator for this? I think the original name
for this
operator -- `cancelRepartition()` -- is itself a sign that this is not an
operation on the
stream itself but rather a command/request to whichever operator would have
otherwise triggered this repartition.

What about instead adding a new field to the Grouped/Joined/StreamJoined
config
objects that signals them to skip the repartitioning?

The one downside to this specific proposal is that you would then need to
specify
this for every stateful operation downstream of the key-changing operation.
So a
better alternative might be to introduce this `skipRepartition` field, or
whatever we
want to call it, to the config object of the operator that's actually doing
the key
changing operation which is apparently preserving the partitioning.

Imo this would be more "safe" relative to the current proposal, as the user
has to
explicitly consider whether every key changing operation is indeed
preserving the
partitioning. Otherwise you could code up a topology with several key
changing
operations at the beginning which do require repartitioning. Then you get
to the end
of the topology and insert one final key changing operation that doesn't,
assume
you can just cancel the repartition, and suddenly you're wondering why your
results
are all screwed up

On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax  wrote:

> Thanks for the KIP Ivan!
>
> I think it's a good feature to give advanced users more control, and
> allow them to build more efficient application.
>
> Not sure if I like the proposed named though (the good old "naming
> things" discussion :))
>
> Did you consider alternatives? What about
>
>  - markAsPartitioned()
>  - markAsKeyed()
>  - skipRepartition()
>
> Not sure if there are other idea on a good name?
>
>
>
> -Matthias
>
> On 6/24/21 7:45 AM, Ivan Ponomarev wrote:
> > Hello,
> >
> > I'd like to start a discussion for KIP-759:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
> >
> >
> > This is an offshoot of the discussion of KIP-655 for a `distinct`
> > operator, which turned out to be a separate proposal.
> >
> > The proposal is quite trivial, however, we still might consider
> > alternatives (see 'Possible Alternatives' section).
> >
> > Regards,
> >
> > Ivan
>


Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-08-03 Thread Matthias J. Sax
Couple of questions:

 - Why restrict de-duplication for the key only? Should we not also
consider the value (or make it somehow flexible and let the user choose?)

 - I am wondering if the return type should be `KStream` instead of a
`KTable`? If I deduplicate a stream, I might expect a stream back? I
don't really consider a stream de-duplication an aggregation with
"mutable state"...

Also, why would the result key need to be windowed?

Btw: How should out-of-order data be handled? Given that you only want
to consider the key, the value could be different, and thus, if there is
out-of-order data, keeping the one or other value could make a
difference? IMHO, an unordered stream and it's ordered "cousin" should
yield the same result? -- Given your example it seems you want to keep
the first record base on offset order. Wondering why?


While I agree that deduplication for overlapping window is questionable,
I am still wondering if you plan to disallow it (by adding a runtime
check and throwing an exception), or not?


On 8/1/21 6:42 AM, Ivan Ponomarev wrote:
> Hi Bruno,
> 
> I'm sorry for the delay with the answer. Unfortunately your messages
> were put to spam folder, that's why I didn't answer them right away.
> 
> Concerning your question about comparing serialized values vs. using
> equals: I think it must be clear now due to John's explanations.
> Distinct is a stateful operation, thus we will need to use
> serialization. (Although AFAICS the in-memory storage might be a good
> practical solution in many cases).
> 
>> I do currently not see why it should not make sense in hopping
> windows... I do not understand  the following sentence: "...one record
> can be multiplied instead of deduplication."
> 
> Ok, let me explain.
> 
> As it's written in the KIP, "The distinct operation returns only a first
> record that falls into a new window, and filters out all the other
> records that fall into an already existing window."
> 
> Also it's worth to remember that the result of `distinct` is
> KTable, V>, not Stream.
> 
> If we have, say, hopping time windows [0, 40], [10, 50], [20, 60] and a
> record (key, val) with timestamp 25 arrives, it will be forwarded three
> times ('multiplied') since is falls into the intersection of all three
> windows. The output will be
> 
> (key@[0/40],  val)
> (key@[10/50], val)
> (key@[20/60], val)
> 
> You can reason about `distinct` operation just like you reason about
> `sum` or `count`. When a record arrives that falls into a window, we
> update the aggregation on this window. For `distinct`, when extra
> records arrive into the same window, we also perform some sort of
> aggregation (we may even count them internally!), but, unlike sum or
> count, we will not forward anything since counter is strictly greater
> than zero.
> 
> You may refer to 'usage examples' of the KIP
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-UsageExamples)
> to get clearer idea of how it works.
> 
>> As I said earlier, I do not think that SQL and the Java Stream API are
> good arguments to not use a verb
> 
> This is an important matter. As we all know, naming is hard.
> 
> However, `distinct` name is not used just in SQL and Java Streams. It is
> a kind of a standard operation that is used in nearly all the data
> processing frameworks, see all the hyperlinked examples in 'Motivation'
> section of KIP
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-Motivation)
> 
> 
> Please look at it and let me know what do you think.
> 
> Regards,
> 
> Ivan
> 
> 29.07.2021 4:49, John Roesler пишет:
>> Hi Bruno,
>>
>> I had previously been thinking to use equals(), since I
>> thought that this might be a stateless operation. Comparing
>> the serialized form requires a serde and a fairly expensive
>> serialization operation, so while byte equality is superior
>> to equals(), we shouldn't use it in operations unless they
>> already require serialization.
>>
>> I chnaged my mind when I later realized I had been mistaken,
>> and this operation is of course stateful.
>>
>> I hope this helps clarify it.
>>
>> Thanks,
>> -John
>>
>> On Fri, 2021-07-23 at 09:53 +0200, Bruno Cadonna wrote:
>>> Hi Ivan and John,
>>>
>>> 1. John, could you clarify why comparing serialized values seems the way
>>> to go, now?
>>>
>>> 2. Ivan, Could you please answer my questions that I posted earlier? I
>>> will repost it here:
>>> Ivan, could you please make this matter a bit clearer in the KIP?
>>> Actually, thinking about it again, I do currently not see why it should
>>> not make sense in hopping windows. Regarding this, I do not understand
>>> the following sentence:
>>>
>>> "hopping and sliding windows do not make much sense for distinct()
>>> because they produce multiple intersected windows, so that one 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-03 Thread Matthias J. Sax
Thanks for the KIP Ivan!

I think it's a good feature to give advanced users more control, and
allow them to build more efficient application.

Not sure if I like the proposed named though (the good old "naming
things" discussion :))

Did you consider alternatives? What about

 - markAsPartitioned()
 - markAsKeyed()
 - skipRepartition()

Not sure if there are other idea on a good name?



-Matthias

On 6/24/21 7:45 AM, Ivan Ponomarev wrote:
> Hello,
> 
> I'd like to start a discussion for KIP-759:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
> 
> 
> This is an offshoot of the discussion of KIP-655 for a `distinct`
> operator, which turned out to be a separate proposal.
> 
> The proposal is quite trivial, however, we still might consider
> alternatives (see 'Possible Alternatives' section).
> 
> Regards,
> 
> Ivan


[jira] [Created] (KAFKA-13162) ElectLeader API must be forwarded to Controller

2021-08-03 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13162:
---

 Summary: ElectLeader API must be forwarded to Controller
 Key: KAFKA-13162
 URL: https://issues.apache.org/jira/browse/KAFKA-13162
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 3.0.0


We're missing the logic to forward ElectLeaders requests to the controller. 
This means that `kafka-leader-election.sh` does not work correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13161) Follower leader and ISR state not updated after partition change in KRaft

2021-08-03 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13161:
---

 Summary: Follower leader and ISR state not updated after partition 
change in KRaft
 Key: KAFKA-13161
 URL: https://issues.apache.org/jira/browse/KAFKA-13161
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jose Armando Garcia Sancio
 Fix For: 3.0.0


In KRaft when we detect a partition change, we first verify whether any leader 
or follower transitions are needed. Depending on the case, we call either 
`applyLocalLeadersDelta` or `applyLocalFollowersDelta`. In the latter case, we 
are missing a call to `Partition.makeFollower` which is responsible for 
updating LeaderAndIsr state for the partitions. As a result of this, the 
partition state may be left stale. 

The specific consequences of this bug are 1) follower fetching fails since the 
epoch is never updated, and 2) a stale leader may continue to accept Produce 
requests. The latter is the bigger issue since it can lead to log divergence if 
we are appending from both the client and from the fetcher thread at the same 
time. I tested this locally and confirmed that it is possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-08-03 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reopened KAFKA-13132:


Found an issue where we don't sufficiently cover case 2. I have a plan to 
properly cover.

 cc: [~kkonstantine] 

> Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
> --
>
> Key: KAFKA-13132
> URL: https://issues.apache.org/jira/browse/KAFKA-13132
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.0.0
>
>
> With the change in 3.0 to how topic IDs are assigned to logs, a bug was 
> inadvertently introduced. Now, topic IDs will only be assigned on the load of 
> the log to a partition in LISR requests. This means we will only assign topic 
> IDs for newly created topics/partitions, on broker startup, or potentially 
> when a partition is reassigned.
>  
> In the case of upgrading from an IBP before 2.8, we may have a scenario where 
> we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller 
> is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last 
> broker upgrading, we will elect a new controller but its LISR request will 
> not result in topic IDs being assigned to logs of existing topics. They will 
> only be assigned in the cases mentioned above.
> *Keep in mind, in this scenario, topic IDs will be still be assigned in the 
> controller/ZK to all new and pre-existing topics and will show up in 
> metadata.*  This means we are not ensured the same guarantees we had in 2.8. 
> *It is just the LISR/partition.metadata part of the code that is affected.* 
>  
> The problem is two-fold
>  1. We ignore LISR requests when the partition leader epoch has not increased 
> (previously we assigned the ID before this check)
>  2. We only assign the topic ID when we are associating the log with the 
> partition in replicamanager for the first time. Though in the scenario 
> described above, we have logs associated with partitions that need to be 
> upgraded.
>  
> We should check the if the LISR request is resulting in a topic ID addition 
> and add logic to logs already associated to partitions in replica manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.0 #79

2021-08-03 Thread Apache Jenkins Server
See 




Re: [DISCUSS] Apache Kafka 3.0.0 release plan with new updated dates

2021-08-03 Thread Ryan Dielhenn
Hi Konstantine,

I would like to report another bug in KRaft.

The ConfigHandler that processes dynamic broker config deltas in KRaft
expects that the default resource name for dynamic broker configs is the
old default entity name used in ZK: "". Since dynamic default
broker configs are persisted as empty string in the quorum instead of
"", the brokers are not updating the their default configuration
when they see empty string as a resource name in the config delta and are
throwing a NumberFormatException when they try to parse the resource name
to process it as a per-broker configuration.

I filed a JIRA: https://issues.apache.org/jira/browse/KAFKA-13160

I also have a PR to fix this: https://github.com/apache/kafka/pull/11168

I think that this should be a blocker for 3.0 because dynamic default
broker configs will not be usable in KRaft otherwise.

Best,
Ryan Dielhenn

On Sat, Jul 31, 2021 at 10:42 AM Konstantine Karantasis <
kkaranta...@apache.org> wrote:

> Thanks Ryan,
>
> Approved. Seems also like a low risk fix.
> With that opportunity, let's make sure there are no other configs that
> would need a similar validation.
>
> Konstantine
>
> On Fri, Jul 30, 2021 at 8:33 AM Ryan Dielhenn
>  wrote:
>
> > Hey Konstantine,
> >
> > Thanks for the question. If these configs are not validated the user's
> > experience will be affected and upgrades from 3.0 will be harder.
> >
> > Best,
> > Ryan Dielhenn
> >
> > On Thu, Jul 29, 2021 at 3:59 PM Konstantine Karantasis <
> > kkaranta...@apache.org> wrote:
> >
> > > Thanks for reporting this issue Ryan.
> > >
> > > I believe what you mention corresponds to the ticket you created here:
> > > https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-13151
> > >
> > > What happens if the configurations are present but the broker doesn't
> > fail
> > > at startup when configured to run in KRaft mode?
> > > Asking to see if we have any workarounds in our availability.
> > >
> > > Thanks,
> > > Konstantine
> > >
> > > On Thu, Jul 29, 2021 at 2:51 PM Ryan Dielhenn
> > >  wrote:
> > >
> > > > Hi,
> > > >
> > > > Disregard log.clean.policy being included in this blocker.
> > > >
> > > > Best,
> > > > Ryan Dielhenn
> > > >
> > > > On Thu, Jul 29, 2021 at 2:38 PM Ryan Dielhenn <
> rdielh...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hey Konstantine,
> > > > >
> > > > > I'd like to report another bug in KRaft.
> > > > >
> > > > > log.cleanup.policy, alter.config.policy.class.name, and
> > > > > create.topic.policy.class.name are all unsupported by KRaft but
> > KRaft
> > > > > servers allow them to be configured. I believe this should be
> > > considered
> > > > a
> > > > > blocker and that KRaft servers should fail startup if any of these
> > are
> > > > > configured. I do not have a PR yet but will soon.
> > > > >
> > > > > On another note, I have a PR for the dynamic broker configuration
> fix
> > > > > here: https://github.com/apache/kafka/pull/11141
> > > > >
> > > > > Best,
> > > > > Ryan Dielhenn
> > > > >
> > > > > On Wed, May 26, 2021 at 2:48 PM Konstantine Karantasis
> > > > >  wrote:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> Please find below the updated release plan for the Apache Kafka
> > 3.0.0
> > > > >> release.
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177046466
> > > > >>
> > > > >> New suggested dates for the release are as follows:
> > > > >>
> > > > >> KIP Freeze is 09 June 2021 (same date as in the initial plan)
> > > > >> Feature Freeze is 30 June 2021 (new date, extended by two weeks)
> > > > >> Code Freeze is 14 July 2021 (new date, extended by two weeks)
> > > > >>
> > > > >> At least two weeks of stabilization will follow Code Freeze.
> > > > >>
> > > > >> The release plan is up to date and currently includes all the
> > approved
> > > > >> KIPs
> > > > >> that are targeting 3.0.0.
> > > > >>
> > > > >> Please let me know if you have any objections with the recent
> > > extension
> > > > of
> > > > >> Feature Freeze and Code Freeze or any other concerns.
> > > > >>
> > > > >> Regards,
> > > > >> Konstantine
> > > > >>
> > > > >
> > > >
> > >
> >
>


[jira] [Resolved] (KAFKA-12646) Implement snapshot generation on brokers

2021-08-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-12646.

Resolution: Fixed

> Implement snapshot generation on brokers
> 
>
> Key: KAFKA-12646
> URL: https://issues.apache.org/jira/browse/KAFKA-12646
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: Jose Armando Garcia Sancio
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12647) Implement loading snapshot in the broker

2021-08-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-12647.

Resolution: Fixed

> Implement loading snapshot in the broker
> 
>
> Key: KAFKA-12647
> URL: https://issues.apache.org/jira/browse/KAFKA-12647
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)
Ryan Dielhenn created KAFKA-13160:
-

 Summary: Fix BrokerConfigHandler to expect empty string as the 
resource name for dynamic default broker configs in KRaft
 Key: KAFKA-13160
 URL: https://issues.apache.org/jira/browse/KAFKA-13160
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.0.0
Reporter: Ryan Dielhenn
Assignee: Ryan Dielhenn
 Fix For: 3.0.0


In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. When these configs are sent to the brokers in a fetch 
response, the BrokerConfigHandler checks if the resource name is "" to 
do a default update and converts the resource name to an integer otherwise to 
do a per-broker config update. In KRaft dynamic default broker configs are 
serialized in the quorum with empty string instead of "". This was 
causing the BrokerConfigHandler to throw a NumberFormatException for dynamic 
default broker configs since the resource name for them is not "" or a 
singular integer. This handler should be fixed to expect empty string for the 
dynamic default broker configs if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12997) Expose log record append time to the controller/broker

2021-08-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-12997.

Resolution: Fixed

> Expose log record append time to the controller/broker
> --
>
> Key: KAFKA-12997
> URL: https://issues.apache.org/jira/browse/KAFKA-12997
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Niket Goel
>Assignee: Jose Armando Garcia Sancio
>Priority: Minor
>  Labels: kip-500
>
> The snapshot records are generated by each individual quorum participant 
> which also stamps the append time in the records. These appends times are 
> generated from a different clock (except in the case of the quorum leader) as 
> compared to the metadata log records (where timestamps are stamped by the 
> leader).
> To enable having a single clock to compare timestamps, 
> https://issues.apache.org/jira/browse/KAFKA-12952 adds a timestamp field to 
> the snapshot header which should contain the append time of the highest 
> record contained in the snapshot (which will be in leader time).
> This JIRA tracks exposing and wiring the batch timestamp such that it can be 
> provided to the SnapshotWriter at the time of snapshot creation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13159) Enable system tests for transactions in KRaft mode

2021-08-03 Thread David Arthur (Jira)
David Arthur created KAFKA-13159:


 Summary: Enable system tests for transactions in KRaft mode
 Key: KAFKA-13159
 URL: https://issues.apache.org/jira/browse/KAFKA-13159
 Project: Kafka
  Issue Type: Test
Reporter: David Arthur
Assignee: David Arthur
 Fix For: 3.0.0


Previously, we disabled several system tests involving system tests in KRaft 
mode. Now that KIP-730 is complete and transactions work in KRaft, we need to 
re-enable these tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13158) Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test and ConnectorPluginsResourceTest

2021-08-03 Thread YI-CHEN WANG (Jira)
YI-CHEN WANG created KAFKA-13158:


 Summary: Replace EasyMock and PowerMock with Mockito for 
ConnectClusterStateImpl Test and ConnectorPluginsResourceTest
 Key: KAFKA-13158
 URL: https://issues.apache.org/jira/browse/KAFKA-13158
 Project: Kafka
  Issue Type: Sub-task
Reporter: YI-CHEN WANG
Assignee: YI-CHEN WANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13157) Kafka-dump-log needs to support snapshot records

2021-08-03 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13157:
--

 Summary: Kafka-dump-log needs to support snapshot records
 Key: KAFKA-13157
 URL: https://issues.apache.org/jira/browse/KAFKA-13157
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


Extends the kafka-dump-log tool to allow the user to view and print kraft 
snapshot files



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13148) Kraft Controller doesn't handle scheduleAppend returning Long.MAX_VALUE

2021-08-03 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13148.
-
Resolution: Fixed

Closing this since it was fixed by KAFKA-12158.

> Kraft Controller doesn't handle scheduleAppend returning Long.MAX_VALUE
> ---
>
> Key: KAFKA-13148
> URL: https://issues.apache.org/jira/browse/KAFKA-13148
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Niket Goel
>Priority: Major
>  Labels: kip-500
>
> In some cases the RaftClient will return Long.MAX_VALUE:
> {code:java}
>   /**
>* Append a list of records to the log. The write will be scheduled for 
> some time
>* in the future. There is no guarantee that appended records will be 
> written to
>* the log and eventually committed. However, it is guaranteed that if 
> any of the
>* records become committed, then all of them will be.
>*
>* If the provided current leader epoch does not match the current 
> epoch, which
>* is possible when the state machine has yet to observe the epoch 
> change, then
>* this method will return {@link Long#MAX_VALUE} to indicate an offset 
> which is
>* not possible to become committed. The state machine is expected to 
> discard all
>* uncommitted entries after observing an epoch change.
>*
>* @param epoch the current leader epoch
>* @param records the list of records to append
>* @return the expected offset of the last record; {@link 
> Long#MAX_VALUE} if the records could
>* be committed; null if no memory could be allocated for the 
> batch at this time
>* @throws org.apache.kafka.common.errors.RecordBatchTooLargeException 
> if the size of the records is greater than the maximum
>* batch size; if this exception is throw none of the elements 
> in records were
>* committed
>*/
>   Long scheduleAtomicAppend(int epoch, List records);
>  {code}
> The controller doesn't handle this case:
> {code:java}
>   // If the operation returned a batch of records, those 
> records need to be
>   // written before we can return our result to the user.  
> Here, we hand off
>   // the batch of records to the raft client.  They will be 
> written out
>   // asynchronously.
>   final long offset;
>   if (result.isAtomic()) {
>   offset = 
> raftClient.scheduleAtomicAppend(controllerEpoch, result.records());
>   } else {
>   offset = raftClient.scheduleAppend(controllerEpoch, 
> result.records());
>   }
>   op.processBatchEndOffset(offset);
>   writeOffset = offset;
>   resultAndOffset = ControllerResultAndOffset.of(offset, 
> result);
>   for (ApiMessageAndVersion message : result.records()) {
>   replay(message.message(), Optional.empty(), offset);
>   }
>   snapshotRegistry.getOrCreateSnapshot(offset);
>   log.debug("Read-write operation {} will be completed when 
> the log " +
>   "reaches offset {}.", this, resultAndOffset.offset());
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12615) Correct comments for the method Selector.clear

2021-08-03 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao resolved KAFKA-12615.
-
Resolution: Fixed

> Correct comments for the method Selector.clear
> --
>
> Key: KAFKA-12615
> URL: https://issues.apache.org/jira/browse/KAFKA-12615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
>  According to my understanding, the second clearCompletedSends which is 
> highlighted as followed should be clearCompletedReceives
> /**
>  * Clears all the results from the previous poll. This is invoked by Selector 
> at the start of
>  * a poll() when all the results from the previous poll are expected to have 
> been handled.
>  * 
>  * SocketServer uses clearCompletedSends() and *clearCompletedSends*() to
>  * clear `completedSends` and `completedReceives` as soon as they are 
> processed to avoid
>  * holding onto large request/response buffers from multiple connections 
> longer than necessary.
>  * Clients rely on Selector invoking {@link #clear()} at the start of each 
> poll() since memory usage
>  * is less critical and clearing once-per-poll provides the flexibility to 
> process these results in
>  * any order before the next poll.
>  */



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12789) Remove Stale comments for meta response handling logic

2021-08-03 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao resolved KAFKA-12789.
-
Resolution: Fixed

> Remove Stale comments for meta response handling logic
> --
>
> Key: KAFKA-12789
> URL: https://issues.apache.org/jira/browse/KAFKA-12789
> Project: Kafka
>  Issue Type: Improvement
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
> According to my understanding, the following paragraph looks like a stale 
> comments.
> {code:java}
> public void handleSuccessfulResponse(RequestHeader requestHeader, long now, 
> MetadataResponse response) {
> ...
> // Don't update the cluster if there are no valid nodes...the 
> topic we want may still be in the process of being
> // created which means we will get errors and no nodes until it 
> exists
> if (response.brokers().isEmpty()) {
> log.trace("Ignoring empty metadata response with correlation 
> id {}.", requestHeader.correlationId());
> this.metadata.failedUpdate(now);
> } else {
> this.metadata.update(inProgress.requestVersion, response, 
> inProgress.isPartialUpdate, now);
> }
> ...
> {code}
> The comments above mean we will may get errors and no nodes if the topic we 
> want may still be in the process of being created.
>  However, every meta request will return all brokers from the logic of the 
> server side, just as followed
> {code:java}
>   def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
> ...
> val brokers = metadataCache.getAliveBrokers
> ...
>   }
> {code}
> I studied the related git commit history and figured out why.
>  # This comments was first introduced in KAFKA-642 (e11447650a). which means 
> meta request only need brokers related to the topics we want.
>  # KAFKA-1535 (commitId: 4ebcdfd51f) changed the server side logic. which has 
> the metadata response contain all alive brokers rather than just the ones 
> needed for the given topics.
>  # However, this comments are retained till now.
> So According to my understanding, this comments looks like a stale one and 
> can be removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #385

2021-08-03 Thread Apache Jenkins Server
See