[jira] [Created] (KAFKA-16572) allow defining number of disks per broker in ClusterTest

2024-04-16 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16572:
--

 Summary: allow defining number of disks per broker in ClusterTest
 Key: KAFKA-16572
 URL: https://issues.apache.org/jira/browse/KAFKA-16572
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


This is a follow-up of KAFKA-16559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16559) allow defining number of disks per broker in TestKitNodes

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16559.

Fix Version/s: 3.8.0
   Resolution: Fixed

> allow defining number of disks per broker in TestKitNodes
> -
>
> Key: KAFKA-16559
> URL: https://issues.apache.org/jira/browse/KAFKA-16559
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Gaurav Narula
>Priority: Major
> Fix For: 3.8.0
>
>
> from: https://github.com/apache/kafka/pull/15136#discussion_r1565571409
> That allow us to run the reassignment tests.
> Also, we should enhance setNumBrokerNodes 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81)
>  to accept extra argument to define the number of folders (by 
> setLogDirectories)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16467: Add how to integrate with kafka repo [kafka-site]

2024-04-16 Thread via GitHub


showuon commented on code in PR #596:
URL: https://github.com/apache/kafka-site/pull/596#discussion_r1568111065


##
README.md:
##
@@ -10,4 +10,32 @@ You can run it with the following command, note that it 
requires docker:
 
 Then you can open [localhost:8080](http://localhost:8080) on your browser and 
browse the documentation.
 
-To kill the process, just type ctrl + c
\ No newline at end of file
+To kill the process, just type ctrl + c.
+
+## How to preview the latest documentation changes in Kafka repository?
+
+1. Generating document from kafka repository:
+
+```shell
+# change directory into kafka repository
+cd KAFKA_REPO
+./gradlew clean siteDocTar
+# supposing built with scala 2.13
+tar zxvf core/build/distributions/kafka_2.13-$(./gradlew properties | grep 
version: | awk '{print $NF}' | head -n 1)-site-docs.tgz
+```
+
+2. Copying the generated documents from Kafka repository into kafka-site, and 
preview them (note that it requires docker):
+
+```shell
+# change directory into kafka-site repository
+cd KAFKA_SITE_REPO
+# copy the generated documents into dev folder
+rm -rf dev
+mkdir dev
+# change directory into kafka repository
+cp -r KAFKA_REPO/site-docs/* dev

Review Comment:
   I don't think this comment is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16571) reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition

2024-04-16 Thread David Mao (Jira)
David Mao created KAFKA-16571:
-

 Summary: reassign_partitions_test.bounce_brokers should wait for 
messages to be sent to every partition
 Key: KAFKA-16571
 URL: https://issues.apache.org/jira/browse/KAFKA-16571
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


This particular system test tries to bounce brokers while produce is ongoing. 
The test also has rf=3 and min.isr=3 configured, so if any brokers are bounced 
before records are produced to every partition, it is possible to run into 
OutOfOrderSequence exceptions similar to what is described in 
https://issues.apache.org/jira/browse/KAFKA-14359

When running the produce_consume_validate for the reassign_partitions_test, 
instead of waiting for 5 acked messages, we should wait for messages to be 
acked on the full set of partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful

2024-04-16 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-16570:
--

 Summary: FenceProducers API returns "unexpected error" when 
successful
 Key: KAFKA-16570
 URL: https://issues.apache.org/jira/browse/KAFKA-16570
 Project: Kafka
  Issue Type: Bug
Reporter: Justine Olshan
Assignee: Justine Olshan


When we want to fence a producer using the admin client, we send an 
InitProducerId request.

There is logic in that API to fence (and abort) any ongoing transactions and 
that is what the API relies on to fence the producer. However, this handling 
also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because we 
want to actually get a new producer ID and want to retry until the the ID is 
supplied or we time out.  
[https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
 



In the case of fence producer, we don't retry and instead we have no handling 
for concurrent transactions and log a message about an unexpected error.
[https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
 

This is not unexpected though and the operation was successful. We should just 
swallow this error and treat this as a successful run of the command. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16569) Target Assignment Format Change

2024-04-16 Thread Ritika Reddy (Jira)
Ritika Reddy created KAFKA-16569:


 Summary: Target Assignment Format Change
 Key: KAFKA-16569
 URL: https://issues.apache.org/jira/browse/KAFKA-16569
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ritika Reddy
Assignee: Ritika Reddy


Currently the assignment is stored as Map>, we 
want to change it to a list

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16568) Add JMH Benchmarks for assignor performance testing

2024-04-16 Thread Ritika Reddy (Jira)
Ritika Reddy created KAFKA-16568:


 Summary: Add JMH Benchmarks for assignor performance testing 
 Key: KAFKA-16568
 URL: https://issues.apache.org/jira/browse/KAFKA-16568
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ritika Reddy
Assignee: Ritika Reddy


The 3 benchmarks that are being used to test the performance and efficiency of 
the consumer group rebalance process.
 * Client Assignors (assign method)
 * Server Assignors (assign method)
 * Target Assignment Builder (build method)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-16 Thread Walter Hernandez (Jira)
Walter Hernandez created KAFKA-16567:


 Summary: Add New Stream Metrics based on KIP-869
 Key: KAFKA-16567
 URL: https://issues.apache.org/jira/browse/KAFKA-16567
 Project: Kafka
  Issue Type: Task
Reporter: Walter Hernandez
 Fix For: 4.0.0


Add the following metrics to the state updater:
 * restoring-active-tasks: count
 * restoring-standby-tasks: count
 * paused-active-tasks: count
 * paused-standby-tasks: count
 * idle-ratio: percentage
 * restore-ratio: percentage
 * checkpoint-ratio: percentage
 * restore-records-total: count
 * restore-records-rate: rate
 * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] KIP-924: customizable task assignment for Streams

2024-04-16 Thread Rohan Desai
https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams

As this KIP has been open for a while, and gone through a couple rounds of
review/revision, I'm calling a vote to get it approved.


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-16 Thread Nick Telford
That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole Topology
(in InternalTopologyBuilder), and pass that into ProcessorStateManager (via
StateManagerUtil) for each Task, which then initializes it.

This can't be the case though, otherwise multiple partitions of the same
sub-topology (aka Tasks) would share the same StateStore instance, which
they don't.

What am I missing?

On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman 
wrote:

> I don't think we need to *require* a constructor accept the TaskId, but we
> would definitely make sure that the RocksDB state store changes its
> constructor to one that accepts the TaskID (which we can do without
> deprecation since its an internal API), and custom state stores can just
> decide for themselves whether they want to opt-in/use the TaskId param
> or not. I mean custom state stores would have to opt-in anyways by
> implementing the new StoreSupplier#get(TaskId) API and the only
> reason to do that would be to have created a constructor that accepts
> a TaskId
>
> Just to be super clear about the proposal, this is what I had in mind.
> It's actually fairly simple and wouldn't add much to the scope of the
> KIP (I think -- if it turns out to be more complicated than I'm assuming,
> we should definitely do whatever has the smallest LOE to get this done
>
> Anyways, the (only) public API changes would be to add this new
> method to the StoreSupplier API:
>
> default T get(final TaskId taskId) {
> return get();
> }
>
> We can decide whether or not to deprecate the old #get but it's not
> really necessary and might cause a lot of turmoil, so I'd personally
> say we just leave both APIs in place.
>
> And that's it for public API changes! Internally, we would just adapt
> each of the rocksdb StoreSupplier classes to implement this new
> API. So for example with the RocksDBKeyValueBytesStoreSupplier,
> we just add
>
> @Override
> public KeyValueStore get(final TaskId taskId) {
> return returnTimestampedStore ?
> new RocksDBTimestampedStore(name, metricsScope(), taskId) :
> new RocksDBStore(name, metricsScope(), taskId);
> }
>
> And of course add the TaskId parameter to each of the actual
> state store constructors returned here.
>
> Does that make sense? It's entirely possible I'm missing something
> important here, but I think this would be a pretty small addition that
> would solve the problem you mentioned earlier while also being
> useful to anyone who uses custom state stores.
>
> On Mon, Apr 15, 2024 at 10:21 AM Nick Telford 
> wrote:
>
> > Hi Sophie,
> >
> > Interesting idea! Although what would that mean for the StateStore
> > interface? Obviously we can't require that the constructor take the
> TaskId.
> > Is it enough to add the parameter to the StoreSupplier?
> >
> > Would doing this be in-scope for this KIP, or are we over-complicating
> it?
> >
> > Nick
> >
> > On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman  >
> > wrote:
> >
> > > Somewhat minor point overall, but it actually drives me crazy that you
> > > can't get access to the taskId of a StateStore until #init is called.
> > This
> > > has caused me a huge headache personally (since the same is true for
> > > processors and I was trying to do something that's probably too hacky
> to
> > > actually complain about here lol)
> > >
> > > Can we just change the StateStoreSupplier to receive and pass along the
> > > taskId when creating a new store? Presumably by adding a new version of
> > the
> > > #get method that takes in a taskId parameter? We can have it default to
> > > invoking the old one for compatibility reasons and it should be
> > completely
> > > safe to tack on.
> > >
> > > Would also prefer the same for a ProcessorSupplier, but that's
> definitely
> > > outside the scope of this KIP
> > >
> > > On Fri, Apr 12, 2024 at 3:31 AM Nick Telford 
> > > wrote:
> > >
> > > > On further thought, it's clear that this can't work for one simple
> > > reason:
> > > > StateStores don't know their associated TaskId (and hence, their
> > > > StateDirectory) until the init() call. Therefore, committedOffset()
> > can't
> > > > be called before init(), unless we also added a StateStoreContext
> > > argument
> > > > to committedOffset(), which I think might be trying to shoehorn too
> > much
> > > > into committedOffset().
> > > >
> > > > I still don't like the idea of the Streams engine maintaining the
> cache
> > > of
> > > > changelog offsets independently of stores, mostly because of the
> > > > maintenance burden of the code duplication, but it looks like we'll
> > have
> > > to
> > > > live with it.
> > > >
> > > > Unless you have any better ideas?
> > > >
> > > > Regards,
> > > > Nick
> > > >
> > > > On Wed, 10 Apr 2024 at 14:12, Nick Telford 
> > > wrote:
> > > >
> > > > > Hi Bruno,
> > > > >
> > > > > Immediately after I sent my response, I looked at the cod

Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-16 Thread Sophie Blee-Goldman
Also ignore everything I said about Streams earlier. I didn't look closely
enough on my first pass over the KIP and thought this was changing the
DeserializationExceptionHandler in Streams. I see now that this is
actually about the consumer client's DeserializationException so everything
I said about using a ByteArray Deserialization and Kafka Streams doesn't
apply here. The important thing is just that it still deserializes one
record
at a time, and essentially throws this when it fails to convert the Record
type into a ConsumerRecord type. So there's always only one record
at a type to consider.

Sorry for any confusion I caused

On Tue, Apr 16, 2024 at 11:15 AM Sophie Blee-Goldman 
wrote:

> Ah, thanks for the additional context. I should have looked at the code
> before I opened my mouth (so to speak)
>
> In that case, I fully agree that using Record instead of ConsumerRecord
> makes sense. It does indeed seem like by definition, if there is a
> DeserializationException then there is no ConsumerRecord since this
> is where/how it gets thrown:
>
> try {
> ...
> return new ConsumerRecord<>(...);
> } catch (RuntimeException e) {
> ...
> throw new RecordDeserializationException(...);
> }
>
> As you mentioned the Record is an input to the method so we definitely have
> one of those, and imo, it makes sense to use. As far as I can tell it's
> just
> a regular public interface so exposing it shouldn't be an issue just based
> on
> the class itself. But I'm still a bit concerned about the checkstyle
> complaint.
>
> I'll try to find someone who can explain why or if we should avoid
> returning
> a Record type here. Other than that, I'd say the KIP LGTM as-is and we
> could kick off voting
>
> On Tue, Apr 16, 2024 at 10:47 AM Frédérik Rouleau
>  wrote:
>
>> Thanks Sophie,
>>
>> I can write something in the KIP on how KStreams solves that issue, but as
>> I can't create a Wiki account, I will have to find someone to do this on
>> my
>> behalf (if someone can work on solving that wiki account creation, it
>> would
>> be great).
>>
>> The biggest difference between Record and ConsumerRecord is that data are
>> stored respectively using ByteBuffer and Byte array.
>>
>> For the Record option, the object already exists in the parsing method, so
>> it's roughly just a parameter type change in the Exception. The point is
>> just about exposing the Record class externally. By the way, the name
>> Record is also making some IDE a bit crazy by confusing it with the new
>> Java Record feature. An alternative could be to create another wrapper
>> type
>> of just include key and value ByteBuffer in the
>> RecordDeserializationException itself.
>>
>> For the ConsumerRecord option, it requires to allocate Byte arrays, even
>> if
>> the user does not need it (skip the poison pill for example). This might
>> have some extra cost on GC for some specific use case.
>>
>> Fred
>>
>


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-16 Thread Sophie Blee-Goldman
Ah, thanks for the additional context. I should have looked at the code
before I opened my mouth (so to speak)

In that case, I fully agree that using Record instead of ConsumerRecord
makes sense. It does indeed seem like by definition, if there is a
DeserializationException then there is no ConsumerRecord since this
is where/how it gets thrown:

try {
...
return new ConsumerRecord<>(...);
} catch (RuntimeException e) {
...
throw new RecordDeserializationException(...);
}

As you mentioned the Record is an input to the method so we definitely have
one of those, and imo, it makes sense to use. As far as I can tell it's just
a regular public interface so exposing it shouldn't be an issue just based
on
the class itself. But I'm still a bit concerned about the checkstyle
complaint.

I'll try to find someone who can explain why or if we should avoid returning
a Record type here. Other than that, I'd say the KIP LGTM as-is and we
could kick off voting

On Tue, Apr 16, 2024 at 10:47 AM Frédérik Rouleau
 wrote:

> Thanks Sophie,
>
> I can write something in the KIP on how KStreams solves that issue, but as
> I can't create a Wiki account, I will have to find someone to do this on my
> behalf (if someone can work on solving that wiki account creation, it would
> be great).
>
> The biggest difference between Record and ConsumerRecord is that data are
> stored respectively using ByteBuffer and Byte array.
>
> For the Record option, the object already exists in the parsing method, so
> it's roughly just a parameter type change in the Exception. The point is
> just about exposing the Record class externally. By the way, the name
> Record is also making some IDE a bit crazy by confusing it with the new
> Java Record feature. An alternative could be to create another wrapper type
> of just include key and value ByteBuffer in the
> RecordDeserializationException itself.
>
> For the ConsumerRecord option, it requires to allocate Byte arrays, even if
> the user does not need it (skip the poison pill for example). This might
> have some extra cost on GC for some specific use case.
>
> Fred
>


[jira] [Created] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16566:
--

 Summary: Update static membership fencing system test to support 
new protocol
 Key: KAFKA-16566
 URL: https://issues.apache.org/jira/browse/KAFKA-16566
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Lianet Magrans
Assignee: Lianet Magrans
 Fix For: 3.8.0


consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
that verifies the sequence in which static members join a group when using 
conflicting instance id. This behaviour is different in the classic and 
consumer protocol, so the tests should be updated to set the right expectations 
when running with the new consumer protocol. Note that what the tests covers 
(params, setup), apply to both protocols. It is the expected results that are 
not the same. 

When conflicts between static members joining a group:

Classic protocol: all members join the group with the same group instance id, 
and then the first one will eventually receive a HB error with 
FencedInstanceIdException

Consumer protocol: new member with an instance Id already in use is not able to 
join, receiving an UnreleasedInstanceIdException in the response to the HB to 
join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-16 Thread Frédérik Rouleau
Thanks Sophie,

I can write something in the KIP on how KStreams solves that issue, but as
I can't create a Wiki account, I will have to find someone to do this on my
behalf (if someone can work on solving that wiki account creation, it would
be great).

The biggest difference between Record and ConsumerRecord is that data are
stored respectively using ByteBuffer and Byte array.

For the Record option, the object already exists in the parsing method, so
it's roughly just a parameter type change in the Exception. The point is
just about exposing the Record class externally. By the way, the name
Record is also making some IDE a bit crazy by confusing it with the new
Java Record feature. An alternative could be to create another wrapper type
of just include key and value ByteBuffer in the
RecordDeserializationException itself.

For the ConsumerRecord option, it requires to allocate Byte arrays, even if
the user does not need it (skip the poison pill for example). This might
have some extra cost on GC for some specific use case.

Fred


[jira] [Created] (KAFKA-16565) IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned

2024-04-16 Thread Kirk True (Jira)
Kirk True created KAFKA-16565:
-

 Summary: IncrementalAssignmentConsumerEventHandler throws error 
when attempting to remove a partition that isn't assigned
 Key: KAFKA-16565
 URL: https://issues.apache.org/jira/browse/KAFKA-16565
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.8.0
Reporter: Kirk True
 Fix For: 3.8.0


In {{{}verifiable_consumer.py{}}}, the Incremental

 
{code:java}
def handle_partitions_revoked(self, event):
self.revoked_count += 1
self.state = ConsumerState.Rebalancing
self.position = {}
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.remove(TopicPartition(topic, partition))
 {code}
If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that 
isn't in the list, an error is thrown. For now, we should first check that the 
{{TopicPartition}} is in the list, and if not, log a warning or something.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-16 Thread Sophie Blee-Goldman
As for the ConsumerRecord vs Record thing -- I personally think the
other alternative that Kirk mentioned would make more sense here,
that is, returning a Optional> rather
than changing the type from ConsumerRecord to Record.

I'm not sure why checkstyle is saying we shouldn't use the Record
class, but I'm a bit uncomfortable with ignoring this unless we have
someone who can explain why it's complaining and whether this
applies to our situation or not. I'm worried the Record class has
something to do with the old legacy records and using it here
would be a step backward. Note that most of the classes that
implement the Record interface have Legacy in their name, and
neither the ConsumerRecord nor ProducerRecord that most people
are familiar with extend the Record class.

That said, I literally have no context on the history of Record or why
it's not supposed to be used, so I welcome someone with more info
here to chime in. Without additional context, I'd say we should just
use the ConsumerRecord type as initially proposed, and make the
getter API return an Optional>

I'm also personally unaware of what might cause us to be unable to
form a ConsumerRecord. Matthias and/or Kirk, can you elaborate
on the specific cases we're worried about here? We should really
highlight those in the getter javadocs to explain why it's an Optional.
What fields would/could be missing?

On Tue, Apr 16, 2024 at 10:11 AM Sophie Blee-Goldman 
wrote:

> I think some missing context here (which can maybe be added in the
> Motivation section as background) is that the deserialization is actually
> done within Streams, not within the Consumer. Since the consumers
> in Kafka Streams might be subscribed to multiple topics with different
> data types, it has to use a ByteArrayDeserializer so that the consumer
> just hands back the plain bytes and then Kafka Streams can do all the
> deserialization based on its knowledge of the topic->serde mapping.
>
> Streams will just store the set of records returned by each #poll as plain
> bytes, and only when they are dequeued does it actually attempt to
> deserialize the record. So there's only one record being deserialized at
> a time, and each one can be handled separately by the deserialization
> exception handler. Which is what KIP-334 relies on.
>
> I do think it's worth saying something like that in the KIP somewhere,
> since it's somewhat obscure knowledge of Kafka Streams internals
> that not everyone will immediately know about. Feel free to just copy
> what I wrote or write something more succinct 🙂
>
> By the way, there are some minor formatting typos in the code snippet
> for the KIP. Not a big deal, but if you're editing the KIP anyways you
> might as well fix that
>
> On Tue, Apr 16, 2024 at 1:33 AM Frédérik Rouleau
>  wrote:
>
>> Hi Almog,
>>
>> I think you do not understand the behavior that was introduced with the
>> KIP-334.
>> When you have a DeserializationException, if you set the proper seek call
>> to skip the faulty record, the next poll call will return the remaining
>> records to process and not a new list of records. When the KIP was
>> released, I made a demo project
>> https://github.com/fred-ro/kafka-poison-pills not sure it's still
>> working,
>> I should spend time maintaining it. The only issue with the initial KIP is
>> that you do not have access to the data of the faulty record which makes
>> DLQ implementation quite difficult.
>>
>> Regards,
>> Fred
>>
>


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-16 Thread Sophie Blee-Goldman
I think some missing context here (which can maybe be added in the
Motivation section as background) is that the deserialization is actually
done within Streams, not within the Consumer. Since the consumers
in Kafka Streams might be subscribed to multiple topics with different
data types, it has to use a ByteArrayDeserializer so that the consumer
just hands back the plain bytes and then Kafka Streams can do all the
deserialization based on its knowledge of the topic->serde mapping.

Streams will just store the set of records returned by each #poll as plain
bytes, and only when they are dequeued does it actually attempt to
deserialize the record. So there's only one record being deserialized at
a time, and each one can be handled separately by the deserialization
exception handler. Which is what KIP-334 relies on.

I do think it's worth saying something like that in the KIP somewhere,
since it's somewhat obscure knowledge of Kafka Streams internals
that not everyone will immediately know about. Feel free to just copy
what I wrote or write something more succinct 🙂

By the way, there are some minor formatting typos in the code snippet
for the KIP. Not a big deal, but if you're editing the KIP anyways you
might as well fix that

On Tue, Apr 16, 2024 at 1:33 AM Frédérik Rouleau
 wrote:

> Hi Almog,
>
> I think you do not understand the behavior that was introduced with the
> KIP-334.
> When you have a DeserializationException, if you set the proper seek call
> to skip the faulty record, the next poll call will return the remaining
> records to process and not a new list of records. When the KIP was
> released, I made a demo project
> https://github.com/fred-ro/kafka-poison-pills not sure it's still working,
> I should spend time maintaining it. The only issue with the initial KIP is
> that you do not have access to the data of the faulty record which makes
> DLQ implementation quite difficult.
>
> Regards,
> Fred
>


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-16 Thread Sophie Blee-Goldman
I don't think we need to *require* a constructor accept the TaskId, but we
would definitely make sure that the RocksDB state store changes its
constructor to one that accepts the TaskID (which we can do without
deprecation since its an internal API), and custom state stores can just
decide for themselves whether they want to opt-in/use the TaskId param
or not. I mean custom state stores would have to opt-in anyways by
implementing the new StoreSupplier#get(TaskId) API and the only
reason to do that would be to have created a constructor that accepts
a TaskId

Just to be super clear about the proposal, this is what I had in mind.
It's actually fairly simple and wouldn't add much to the scope of the
KIP (I think -- if it turns out to be more complicated than I'm assuming,
we should definitely do whatever has the smallest LOE to get this done

Anyways, the (only) public API changes would be to add this new
method to the StoreSupplier API:

default T get(final TaskId taskId) {
return get();
}

We can decide whether or not to deprecate the old #get but it's not
really necessary and might cause a lot of turmoil, so I'd personally
say we just leave both APIs in place.

And that's it for public API changes! Internally, we would just adapt
each of the rocksdb StoreSupplier classes to implement this new
API. So for example with the RocksDBKeyValueBytesStoreSupplier,
we just add

@Override
public KeyValueStore get(final TaskId taskId) {
return returnTimestampedStore ?
new RocksDBTimestampedStore(name, metricsScope(), taskId) :
new RocksDBStore(name, metricsScope(), taskId);
}

And of course add the TaskId parameter to each of the actual
state store constructors returned here.

Does that make sense? It's entirely possible I'm missing something
important here, but I think this would be a pretty small addition that
would solve the problem you mentioned earlier while also being
useful to anyone who uses custom state stores.

On Mon, Apr 15, 2024 at 10:21 AM Nick Telford 
wrote:

> Hi Sophie,
>
> Interesting idea! Although what would that mean for the StateStore
> interface? Obviously we can't require that the constructor take the TaskId.
> Is it enough to add the parameter to the StoreSupplier?
>
> Would doing this be in-scope for this KIP, or are we over-complicating it?
>
> Nick
>
> On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman 
> wrote:
>
> > Somewhat minor point overall, but it actually drives me crazy that you
> > can't get access to the taskId of a StateStore until #init is called.
> This
> > has caused me a huge headache personally (since the same is true for
> > processors and I was trying to do something that's probably too hacky to
> > actually complain about here lol)
> >
> > Can we just change the StateStoreSupplier to receive and pass along the
> > taskId when creating a new store? Presumably by adding a new version of
> the
> > #get method that takes in a taskId parameter? We can have it default to
> > invoking the old one for compatibility reasons and it should be
> completely
> > safe to tack on.
> >
> > Would also prefer the same for a ProcessorSupplier, but that's definitely
> > outside the scope of this KIP
> >
> > On Fri, Apr 12, 2024 at 3:31 AM Nick Telford 
> > wrote:
> >
> > > On further thought, it's clear that this can't work for one simple
> > reason:
> > > StateStores don't know their associated TaskId (and hence, their
> > > StateDirectory) until the init() call. Therefore, committedOffset()
> can't
> > > be called before init(), unless we also added a StateStoreContext
> > argument
> > > to committedOffset(), which I think might be trying to shoehorn too
> much
> > > into committedOffset().
> > >
> > > I still don't like the idea of the Streams engine maintaining the cache
> > of
> > > changelog offsets independently of stores, mostly because of the
> > > maintenance burden of the code duplication, but it looks like we'll
> have
> > to
> > > live with it.
> > >
> > > Unless you have any better ideas?
> > >
> > > Regards,
> > > Nick
> > >
> > > On Wed, 10 Apr 2024 at 14:12, Nick Telford 
> > wrote:
> > >
> > > > Hi Bruno,
> > > >
> > > > Immediately after I sent my response, I looked at the codebase and
> came
> > > to
> > > > the same conclusion. If it's possible at all, it will need to be done
> > by
> > > > creating temporary StateManagers and StateStores during rebalance. I
> > > think
> > > > it is possible, and probably not too expensive, but the devil will be
> > in
> > > > the detail.
> > > >
> > > > I'll try to find some time to explore the idea to see if it's
> possible
> > > and
> > > > report back, because we'll need to determine this before we can vote
> on
> > > the
> > > > KIP.
> > > >
> > > > Regards,
> > > > Nick
> > > >
> > > > On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna 
> > wrote:
> > > >
> > > >> Hi Nick,
> > > >>
> > > >> Thanks for reacting on my comments so quickly!
> > > >>
> > > >>
> > > >> 2.
> > > >> Some thoughts on your proposal.
> > > 

Re: Permission to contribute to Apache Kafka Project

2024-04-16 Thread Josep Prat
Hi Robin,

You are now set up. Thanks for your interest in Apache Kafka.

Best,

On Tue, Apr 16, 2024 at 3:31 PM Robin Han  wrote:

> Hi there,
>
> My Jira ID is 'robinhan' and I'd like to ask permission to contribute to
> the Apache Kafka Project.
>
>
> I have encountered an error when upgrading from version 3.4.0 to 3.7.0 in
> Kraft mode. I would like to fix this issue by submitting a Jira Ticket and
> a Github PR. Unfortunately, I don't have the permission to create a Jira
> Ticket at the moment.
> [image: image.png]
>
> ---
> Robin Han
>


-- 
[image: Aiven] 

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io   |   +491715557497
aiven.io    |   
     
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B


Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #2817

2024-04-16 Thread Apache Jenkins Server
See 




Permission to contribute to Apache Kafka Project

2024-04-16 Thread Robin Han
Hi there,

My Jira ID is 'robinhan' and I'd like to ask permission to contribute to
the Apache Kafka Project.


I have encountered an error when upgrading from version 3.4.0 to 3.7.0 in
Kraft mode. I would like to fix this issue by submitting a Jira Ticket and
a Github PR. Unfortunately, I don't have the permission to create a Jira
Ticket at the moment.
[image: image.png]

---
Robin Han


[jira] [Created] (KAFKA-16564) Apply `Xlint` to java code in core module

2024-04-16 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16564:
--

 Summary: Apply `Xlint` to java code in core module
 Key: KAFKA-16564
 URL: https://issues.apache.org/jira/browse/KAFKA-16564
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-16 Thread Andrew Schofield
Hi Jun,
Thanks for you reply.

42.1. That’s a sensible improvement. Done.

47,56. Done. All instances of BaseOffset changed to FirstOffset.

105. I think that would be in a future KIP. Personally, I don’t mind having
a non-contiguous set of values in this KIP.

114. Done.

115. If the poll is just returning a single record because there is not much
data to consume, committing on every record is OK. It’s inefficient but 
acceptable.
If the first poll returns just one record, but many more have piled up while
the first one was being processed, the next poll has the opportunity to return
a bunch of records and then these will be able to be committed together.
So, my answer is that optimisation on the broker to return batches of
records when the records are available is the approach we will take here.

116. Good idea. Done.

117. I’ve rewritten the Kafka Broker Migration section. Let me know what you 
think.
I am discussing the configuration to enable the feature in the mailing list with
David Jacot also, so I anticipate a bit of change in this area still.

Thanks,
Andrew

> On 15 Apr 2024, at 23:34, Jun Rao  wrote:
> 
> Hi, Andrew,
> 
> Thanks for the updated KIP.
> 
> 42.1 "If the share group offset is altered multiple times when the group
> remains empty, it would be harmless if the same state epoch was reused to
> initialize the state."
> Hmm, how does the partition leader know for sure that it has received the
> latest share group offset if epoch is reused?
> Could we update the section "Group epoch - Trigger a rebalance" that
> AdminClient.alterShareGroupOffsets causes the group epoch to be bumped too?
> 
> 47,56 "my view is that BaseOffset should become FirstOffset in ALL schemas
> defined in the KIP."
> Yes, that seems better to me.
> 
> 105. "I have another non-terminal state in mind for 3."
> Should we document it?
> 
> 114. session.timeout.ms in the consumer configuration is deprecated in
> KIP-848. So, we need to remove it from the shareConsumer configuration.
> 
> 115. I am wondering if it's a good idea to always commit acks on
> ShareConsumer.poll(). In the extreme case, each batch may only contain a
> single record and each poll() only returns a single batch. This will cause
> each record to be committed individually. Is there a way for a user to
> optimize this?
> 
> 116. For each new RPC, could we list the associated acls?
> 
> 117. Since this KIP changes internal records and RPCs, it would be useful
> to document the upgrade process.
> 
> Jun
> 
> On Wed, Apr 10, 2024 at 7:35 AM Andrew Schofield 
> wrote:
> 
>> Hi Jun,
>> Thanks for your questions.
>> 
>> 41.
>> 41.1. The partition leader obtains the state epoch in the response from
>> ReadShareGroupState. When it becomes a share-partition leader,
>> it reads the share-group state and one of the things it learns is the
>> current state epoch. Then it uses the state epoch in all subsequent
>> calls to WriteShareGroupState. The fencing is to prevent writes for
>> a previous state epoch, which are very unlikely but which would mean
>> that a leader was using an out-of-date epoch and was likely no longer
>> the current leader at all, perhaps due to a long pause for some reason.
>> 
>> 41.2. If the group coordinator were to set the SPSO, wouldn’t it need
>> to discover the initial offset? I’m trying to avoid yet another
>> inter-broker
>> hop.
>> 
>> 42.
>> 42.1. I think I’ve confused things. When the share group offset is altered
>> using AdminClient.alterShareGroupOffsets, the group coordinator WILL
>> update the state epoch. I don’t think it needs to update the group epoch
>> at the same time (although it could) because the group epoch will have
>> been bumped when the group became empty. If the share group offset
>> is altered multiple times when the group remains empty, it would be
>> harmless if the same state epoch was reused to initialize the state.
>> 
>> When the share-partition leader updates the SPSO as a result of
>> the usual flow of record delivery, it does not update the state epoch.
>> 
>> 42.2. The share-partition leader will notice the alteration because,
>> when it issues WriteShareGroupState, the response will contain the
>> error code FENCED_STATE_EPOCH. This is supposed to be the
>> last-resort way of catching this.
>> 
>> When the share-partition leader handles its first ShareFetch request,
>> it learns the state epoch from the response to ReadShareGroupState.
>> 
>> In normal running, the state epoch will remain constant, but, when there
>> are no consumers and the group is empty, it might change. As a result,
>> I think it would be sensible when the set of share sessions transitions
>> from 0 to 1, which is a reasonable proxy for the share group transitioning
>> from empty to non-empty, for the share-partition leader to issue
>> ReadShareGroupOffsetsState to validate the state epoch. If its state
>> epoch is out of date, it can then ReadShareGroupState to re-initialize.
>> 
>> I’ve changed the KIP accordingly.
>> 
>> 47, 

[jira] [Created] (KAFKA-16563) migration to KRaft hanging after KeeperException

2024-04-16 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16563:
-

 Summary: migration to KRaft hanging after KeeperException
 Key: KAFKA-16563
 URL: https://issues.apache.org/jira/browse/KAFKA-16563
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Luke Chen
Assignee: Luke Chen


When running ZK migrating to KRaft process, we encountered an issue that the 
migrating is hanging and the `ZkMigrationState` cannot move to `MIGRATION` 
state. After investigation, the root cause is because the pollEvent didn't 
retry with the retriable KeeperException while it should.

 
{code:java}
2024-04-11 21:27:55,393 INFO [KRaftMigrationDriver id=5] Encountered ZooKeeper 
error during event PollEvent. Will retry. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
[controller-5-migration-driver-event-handler]org.apache.zookeeper.KeeperException$NodeExistsException:
 KeeperErrorCode = NodeExists for /migrationat 
org.apache.zookeeper.KeeperException.create(KeeperException.java:126)at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:54)at 
kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:570)at 
kafka.zk.KafkaZkClient.createInitialMigrationState(KafkaZkClient.scala:1701)
at kafka.zk.KafkaZkClient.getOrCreateMigrationState(KafkaZkClient.scala:1689)   
 at 
kafka.zk.ZkMigrationClient.$anonfun$getOrCreateMigrationRecoveryState$1(ZkMigrationClient.scala:109)
at 
kafka.zk.ZkMigrationClient.getOrCreateMigrationRecoveryState(ZkMigrationClient.scala:69)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:248)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.recoverMigrationStateFromZK(KRaftMigrationDriver.java:169)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$1900(KRaftMigrationDriver.java:62)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$PollEvent.run(KRaftMigrationDriver.java:794)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base/java.lang.Thread.run(Thread.java:840){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1037: Allow WriteTxnMarkers API with Alter Cluster Permission

2024-04-16 Thread Andrew Schofield
Hi Nikhil,
I agree with Christo. This is a good improvement and I think your choice of
Alter permission on the cluster is the best available.

Thanks,
Andrew

> On 15 Apr 2024, at 12:33, Christo Lolov  wrote:
>
> Heya Nikhil,
>
> Thank you for raising this KIP!
>
> Your proposal makes sense to me. In essence you are saying that the
> permission required by WriteTxnMarkers should be the same as for CreateAcls
> and DeleteAcls, which is reasonable. If we trust an administrator to assign
> the correct permissions then we should also trust them to be able to abort
> a hanging transaction.
>
> I would support this KIP if it is put to the vote unless there are other
> suggestions for improvements!
>
> Best,
> Christo
>
> On Thu, 11 Apr 2024 at 16:48, Nikhil Ramakrishnan <
> ramakrishnan.nik...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> I would like to start a discussion for
>>
>> KIP-1037: Allow WriteTxnMarkers API with Alter Cluster Permission
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1037%3A+Allow+WriteTxnMarkers+API+with+Alter+Cluster+Permission
>>
>> The WriteTxnMarkers API was originally used for inter-broker
>> communication only. This required the ClusterAction permission on the
>> Cluster resource to invoke.
>>
>> In KIP-664, we modified the WriteTxnMarkers API so that it could be
>> invoked externally from the Kafka AdminClient to safely abort a
>> hanging transaction. Such usage is more aligned with the Alter
>> permission on the Cluster resource, which includes other
>> administrative actions invoked from the Kafka AdminClient (i.e.
>> CreateAcls and DeleteAcls). This KIP proposes allowing the
>> WriteTxnMarkers API to be invoked with the Alter permission on the
>> Cluster.
>>
>> I am looking forward to your thoughts and suggestions for improvement!
>>
>> Thanks,
>> Nikhil
>>



Re: [PR] KAFKA-16467: Add how to integrate with kafka repo [kafka-site]

2024-04-16 Thread via GitHub


FrankYang0529 commented on PR #596:
URL: https://github.com/apache/kafka-site/pull/596#issuecomment-2058627999

   Hi @showuon, thanks for reviewing. I've addressed all comments and add you 
as co-author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16562) Install the ginkgo to tools folder

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16562.

Resolution: Invalid

sorry this jira is for yunikorn ...

> Install the ginkgo to tools folder
> --
>
> Key: KAFKA-16562
> URL: https://issues.apache.org/jira/browse/KAFKA-16562
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> https://github.com/apache/yunikorn-k8shim/blob/master/scripts/run-e2e-tests.sh#L111
> `run-e2e-tests.sh` install the ginkgo in the `GOBIN` folder. However, our 
> `make e2e_test` assumes all tools are installed at yunikorn-k8shim/tools, and 
> hence we can see `Command 'ginkgo' not found` error if we don't export the 
> `GOBIN`.
> It seems to me this jira should includes following changes.
> 1. move the ginkgo installation from run-e2e-tests to makefile. Let `make 
> tools` install all required tools
> 2. makefile should install ginkgo to tools folder. the tools folder is added 
> to PATH, so user can run e2e more easily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16562) Install the ginkgo to tools folder

2024-04-16 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16562:
--

 Summary: Install the ginkgo to tools folder
 Key: KAFKA-16562
 URL: https://issues.apache.org/jira/browse/KAFKA-16562
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


https://github.com/apache/yunikorn-k8shim/blob/master/scripts/run-e2e-tests.sh#L111

`run-e2e-tests.sh` install the ginkgo in the `GOBIN` folder. However, our `make 
e2e_test` assumes all tools are installed at yunikorn-k8shim/tools, and hence 
we can see `Command 'ginkgo' not found` error if we don't export the `GOBIN`.

It seems to me this jira should includes following changes.

1. move the ginkgo installation from run-e2e-tests to makefile. Let `make 
tools` install all required tools
2. makefile should install ginkgo to tools folder. the tools folder is added to 
PATH, so user can run e2e more easily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-16 Thread Frédérik Rouleau
Hi Almog,

I think you do not understand the behavior that was introduced with the
KIP-334.
When you have a DeserializationException, if you set the proper seek call
to skip the faulty record, the next poll call will return the remaining
records to process and not a new list of records. When the KIP was
released, I made a demo project
https://github.com/fred-ro/kafka-poison-pills not sure it's still working,
I should spend time maintaining it. The only issue with the initial KIP is
that you do not have access to the data of the faulty record which makes
DLQ implementation quite difficult.

Regards,
Fred


[jira] [Created] (KAFKA-16561) Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config

2024-04-16 Thread Yangkun Ai (Jira)
Yangkun Ai created KAFKA-16561:
--

 Summary: Disable `allow.auto.create.topics` in MirrorMaker2 
Consumer Config
 Key: KAFKA-16561
 URL: https://issues.apache.org/jira/browse/KAFKA-16561
 Project: Kafka
  Issue Type: Improvement
Reporter: Yangkun Ai


While using MirrorMaker 2.0 (MM2), I noticed that the consumer used by the 
connector does not disable the ALLOW_AUTO_CREATE_TOPICS_CONFIG option. This 
leads to the possibility of a topic being immediately recreated if I attempt to 
delete it from the source cluster while MirrorMaker 2.0 is running. I believe 
that automatic creation of new topics in this scenario is unreasonable, hence I 
think it is necessary to explicitly disable this option in the code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16560) Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig

2024-04-16 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16560:
--

 Summary: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig
 Key: KAFKA-16560
 URL: https://issues.apache.org/jira/browse/KAFKA-16560
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


origin discussion: 
https://github.com/apache/kafka/pull/15715#discussion_r1564660916

It seems to me this jira should address following tasks.

1. make them immutable. We have adopted the builder pattern, so all changes 
should be completed in the builder phase

2. make all `Builder#build()` not accept any arguments. Instead, we should add 
new setters for those arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16559) Support to define the number of data folders in ClusterTest

2024-04-16 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16559:
--

 Summary: Support to define the number of data folders in 
ClusterTest
 Key: KAFKA-16559
 URL: https://issues.apache.org/jira/browse/KAFKA-16559
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


That allow us to run the reassignment tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-04-16 Thread Claude Warren
The difference between p.i.q.window.count and p.i.q.window.num:

To be honest, I may have misunderstood your definition of window num.  But
here is what I have in mind:


   1. p.i.q.window.size.seconds the length of time that a window will
   exist.  This is also the maximum time between PID uses where the PID is
   considered to be the same.  Reuse of the PID after p.iq.window.size.seconds
   triggers recording the PID as a new PID.
   Define a new configuration option "producer.id.quota.window.count" as
   the number of windows active in window.size.seconds.
   2. p.i.q.window.count (the new one), how many sections to break
   p.i.q.window.size.seconds into.  In the initial KIP this is 2.  This value
   gives us the timing for creating a new layer in the layered bloom filter.
   So every (p.i.q.window.size.seconds / p.i.q.window.count) seconds a new
   layer will be created and an old layer or layers will be removed.  The
   layered bloom filter will add layers to keep the probability of false
   positives in range.
   3. p.i.q.window.num, specified as 11 in the KIP.  I thought this was how
   many PIDs were expected in each window.  In the original KIP this means
   that we expect to see the PID 22 times (2 windows x 11).  In the Layered
   Bloom filter this would be the N value for the Shape.
   4. p.i.q.window.fpr (needs to be specified) the expected false positive
   rate.  Not sure how to express this in the config in a way that makes sense
   but something like 0.06 or the like.  This is the P value for the
   Shape.  See https://hur.st/bloomfilter for a Bloom filter calculator.

Once we have the N and P for the Shape the shape can be instantiated as
"Shape s = Shape.fromNP( int n, double p );"

In the layered filter once N items have been added to the layer a new layer
is created.  Layers are removed after p.i.q.window.size.seconds so if there
is a burst of PIDs the number of layers will expand and then shrink back as
the PIDs expire.  While running there is always at least 1 layer.

Some calculated points:

   - No layer will span more than p.i.q.window.size.seconds /
   p.i.q.window.count seconds.
   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
   p.i.q.window.count =2 and a rate of 22 PIDs per minute there will be 2
   layers.
   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
   p.i.q.window.count =2 and a rate of 10 PIDs per minute there will be 2
   layers.
   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
   p.i.q.window.count =2 and that no PIDS have been seen in the last 30
   seconds there will be 2 layers.
   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
   p.i.q.window.count =2 and that no PIDS have been seen in the last 60
   seconds there will be 1 layer.
   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
   p.i.q.window.count =2 and a rate of 23 to 44 PIDs per minute there will be
   4 layers.
   - the false positive rate across the layers remains at or below Shape.P
   - Assuming Shape.N = 11 and Shape.P = 0.06 the Bloom filter at each
   layer will consume 35 bytes. https://hur.st/bloomfilter provides a quick
   calculator for other values.

Claude






On Tue, Apr 16, 2024 at 8:06 AM Claude Warren  wrote:

> Let's put aside the CPC datasketch idea and just discuss the Bloom filter
> approach.
>
> I thinkthe problem with the way the KIP is worded is that PIDs are only
> added if they are not seen in either of the Bloom filters.
>
> So an early PID is added to the first filter and the associated metric is
> updated.
> that PID is seen multiple times over the next 60 minutes, but is not added
> to the Bloom filters again.
> once the 60 minutes elapses the first filter is cleared, or removed and a
> new one started.  In any case the PID is no longer recorded in any extant
> Bloom filter.
> the PID is seen again and is added to the newest bloom filter and the
> associated metric is updated.
>
> I believe at this point the metric is incorrect, the PID has been counted
> 2x, when it has been in use for the entire time.
>
> The "track" method that I added solves this problem by ensuring that the
> PID is always seen in the latter half of the set of Bloom filters.  In the
> case of 2 filters that is always the second one, but remember that the
> number of layers will grow as the filters become saturated.  So if your
> filter is intended to hold 500 PIDs and the 501st PID is registered before
> the expiration a new layer (Bloom filter) is added for new PIDS to be added
> into.
>
> On Mon, Apr 15, 2024 at 5:00 PM Omnia Ibrahim 
> wrote:
>
>> Hi Claude,
>> Thanks for the implementation of the LayeredBloomFilter in apache
>> commons.
>>
>> > Define a new configuration option "producer.id.quota.window.count" as
>> > the number of windows active in window.size.seconds.
>> What is the different between “producer.id.quota.window.count” and
>> producer.id.quota.window.num
>>
>> > Basically the kip says, if the PID is found in either of the Bl