Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2916

2024-05-17 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16793) Heartbeat API for upgrading ConsumerGroup

2024-05-17 Thread Dongnuo Lyu (Jira)
Dongnuo Lyu created KAFKA-16793:
---

 Summary: Heartbeat API for upgrading ConsumerGroup
 Key: KAFKA-16793
 URL: https://issues.apache.org/jira/browse/KAFKA-16793
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dongnuo Lyu
Assignee: Dongnuo Lyu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15723) KRaft support in ListOffsetsRequestTest

2024-05-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-15723.

Fix Version/s: 3.8.0
   Resolution: Fixed

> KRaft support in ListOffsetsRequestTest
> ---
>
> Key: KAFKA-15723
> URL: https://issues.apache.org/jira/browse/KAFKA-15723
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Mickael Maison
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
> Fix For: 3.8.0
>
>
> The following tests in ListOffsetsRequestTest in 
> core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala need to be 
> updated to support KRaft
> 37 : def testListOffsetsErrorCodes(): Unit = {
> 84 : def testListOffsetsMaxTimeStampOldestVersion(): Unit = {
> 112 : def testCurrentEpochValidation(): Unit = {
> 173 : def testResponseIncludesLeaderEpoch(): Unit = {
> 210 : def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = {
> Scanned 261 lines. Found 0 KRaft tests out of 5 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16544) DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE

2024-05-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16544.

Fix Version/s: 3.8
   Resolution: Fixed

> DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames 
> should return null instead of throwing NPE
> --
>
> Key: KAFKA-16544
> URL: https://issues.apache.org/jira/browse/KAFKA-16544
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Major
> Fix For: 3.8
>
>
> {code:java}
>  * @return A future map from topic names to descriptions which can be 
> used to check
>  * the status of individual description if the describe topic 
> request used
>  * topic names, otherwise return null, this request succeeds only 
> if all the
>  * topic descriptions succeed
> {code}
> According the docs, it should return null if we try to get the result 
> unmatched to the request. For example, we call `allTopicNames` in passing 
> `TopicIdCollection`. However, the current implementation will throw NPE 
> directly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2914

2024-05-17 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16792) Enable related unit tests that fail only for new consumer with to poll(0)

2024-05-17 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16792:
--

 Summary: Enable related unit tests that fail only for new consumer 
with to poll(0)
 Key: KAFKA-16792
 URL: https://issues.apache.org/jira/browse/KAFKA-16792
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Lianet Magrans


Enable the following unit tests for the new async consumer in KafkaConsumerTest:
- testFetchStableOffsetThrowInPoll

- testCurrentLag
- testListOffsetShouldUpdateSubscriptions
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16791) Add thread detection to ClusterTestExtensions

2024-05-17 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16791:
--

 Summary: Add thread detection to ClusterTestExtensions
 Key: KAFKA-16791
 URL: https://issues.apache.org/jira/browse/KAFKA-16791
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


`ClusterTestExtensions` should implement `BeforeAllCallback` and 
`AfterAllCallback` by `TestUtils.verifyNoUnexpectedThreads`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-1028: Docker Official Image for Apache Kafka

2024-05-17 Thread Vedarth Sharma
Hi everyone!

Thanks for participating in the discussion and voting! KIP-1028 has been
accepted with the following +1 votes:

- Chris Egerton (binding)
- Manikumar (binding)
- Justine Olshan (binding)
- Hector Geraldino (non-binding)

The target release for this KIP is 3.8.0

Thanks and regards,
Vedarth


On Wed, May 15, 2024 at 12:31 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
hgerald...@bloomberg.net> wrote:

> +1 (non-binding) Thanks Vedarth!
>
> From: dev@kafka.apache.org At: 05/14/24 12:13:14 UTC-4:00To:
> dev@kafka.apache.org
> Subject: [VOTE] KIP-1028: Docker Official Image for Apache Kafka
>
> Hi everyone,
>
> I'd like to call a vote on KIP-1028 which aims to introduce a JVM based
> Docker Official Image (DOI) for Apache Kafka.
>
> KIP -
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1028%3A+Docker+Official+Im
> age+for+Apache+Kafka
>
> Discussion thread -
> https://lists.apache.org/thread/6vvwx173jcbgj6vqoq6bo8c0k0ntym0w
>
> Thanks and regards,
> Vedarth
>
>
>


[jira] [Created] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-17 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-16790:
-

 Summary: Calls to RemoteLogManager are made before it is configured
 Key: KAFKA-16790
 URL: https://issues.apache.org/jira/browse/KAFKA-16790
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.8.0
Reporter: Christo Lolov


BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
RemoteLogManager is configured after the BrokerMetadataPublisher starts running 
(3, 4). This is incorrect, we either need to initialise the RemoteLogManager 
before we start the BrokerMetadataPublisher or we need to skip calls to 
onLeadershipChange if the RemoteLogManager is not initialised.

(1) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]

(2) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]

(3) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]

(4) 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]

The way to reproduce the problem is by looking at the following branch 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14

2024-05-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16763.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Upgrade to scala 2.12.19 and scala 2.13.14
> --
>
> Key: KAFKA-16763
> URL: https://issues.apache.org/jira/browse/KAFKA-16763
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Minor
> Fix For: 3.8.0
>
>
> scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19)
>  
> scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1044: A proposal to change idempotent producer -- server implementation

2024-05-17 Thread Justine Olshan
Respectfully, I don't agree. Why should we persist useless information
for clients that are long gone and will never use it?
This is why I'm suggesting we do something smarter when it comes to storing
data and only store data we actually need and have a use for.

This is why I suggest the heartbeat. It gives us clear information (up to
the heartbeat interval) of which producers are worth keeping and which that
are not.
I'm not in favor of building a new and complicated system to try to guess
which information is needed. In my mind, if we have a ton of legitimately
active producers, we should scale up memory. If we don't there is no reason
to have high memory usage.

Fixing the client also allows us to fix some of the other issues we have
with idempotent producers.

Justine

On Fri, May 17, 2024 at 12:46 AM Claude Warren  wrote:

> I think that the point here is that the design that assumes that you can
> keep all the PIDs in memory for all server configurations and all usages
> and all client implementations is fraught with danger.
>
> Yes, there are solutions already in place (KIP-854) that attempt to address
> this problem, and other proposed solutions to remove that have undesirable
> side effects (e.g. Heartbeat interrupted by IP failure for a slow producer
> with a long delay between posts).  KAFKA-16229 (Slow expiration of Producer
> IDs leading to high CPU usage) dealt with how to expire data from the cache
> so that there was minimal lag time.
>
> But the net issue is still the underlying design/architecture.
>
> There are a  couple of salient points here:
>
>- The state of a state machine is only a view on its transactions.  This
>is the classic stream / table dichotomy.
>- What the "cache" is trying to do is create that view.
>- In some cases the size of the state exceeds the storage of the cache
>and the systems fail.
>- The current solutions have attempted to place limits on the size of
>the state.
>- Errors in implementation and or configuration will eventually lead to
>"problem producers"
>- Under the adopted fixes and current slate of proposals, the "problem
>producers" solutions have cascading side effects on properly behaved
>producers. (e.g. dropping long running, slow producing producers)
>
> For decades (at least since the 1980's and anecdotally since the 1960's)
> there has been a solution to processing state where the size of the state
> exceeded the memory available.  It is the solution that drove the idea that
> you could have tables in Kafka.  The idea that we can store the hot PIDs in
> memory using an LRU and write data to storage so that we can quickly find
> things not in the cache is not new.  It has been proven.
>
> I am arguing that we should not throw away state data because we are
> running out of memory.  We should persist that data to disk and consider
> the disk as the source of truth for state.
>
> Claude
>
>
> On Wed, May 15, 2024 at 7:42 PM Justine Olshan
> 
> wrote:
>
> > +1 to the comment.
> >
> > > I still feel we are doing all of this only because of a few
> anti-pattern
> > or misconfigured producers and not because we have “too many Producer”.
> I
> > believe that implementing Producer heartbeat and remove short-lived PIDs
> > from the cache if we didn’t receive heartbeat will be more simpler and
> step
> > on right direction  to improve idempotent logic and maybe try to make PID
> > get reused between session which will implement a real idempotent
> producer
> > instead of idempotent session.  I admit this wouldn’t help with old
> clients
> > but it will put us on the right path.
> >
> > This issue is very complicated and I appreciate the attention on it.
> > Hopefully we can find a good solution working together :)
> >
> > Justine
> >
> > On Wed, May 15, 2024 at 8:36 AM Omnia Ibrahim 
> > wrote:
> >
> > > Also in the rejection alternatives you listed an approved KIP which is
> a
> > > bit confusing can you move this to motivations instead
> > >
> > > > On 15 May 2024, at 14:35, Claude Warren  wrote:
> > > >
> > > > This is a proposal that should solve the OOM problem on the servers
> > > without
> > > > some of the other proposed KIPs being active.
> > > >
> > > > Full details in
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1044%3A+A+proposal+to+change+idempotent+producer+--+server+implementation
> > >
> > >
> >
>
>
> --
> LinkedIn: http://www.linkedin.com/in/claudewarren
>


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2913

2024-05-17 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-16762) SyncGroup API for upgrading ConsumerGroup

2024-05-17 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16762.
-
Fix Version/s: 3.8.0
 Reviewer: David Jacot
   Resolution: Fixed

> SyncGroup API for upgrading ConsumerGroup
> -
>
> Key: KAFKA-16762
> URL: https://issues.apache.org/jira/browse/KAFKA-16762
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1042 support for wildcard when creating new acls

2024-05-17 Thread Claude Warren
>
> This has implications for execution complexity: If we can't compute
> whether two patterns overlap, then we need to run both of them on each
> piece of input to test if they both match. Under the current
> LITERAL/PREFIX system, we can optimize execution with a trie, but that
> option wouldn't be available to us with MATCH.
>

If we consider the case of an asterisk representing 1 or more characters
then determining if 2 patterns overlap can be fairly simple and very fast.
Let's call the text from the ACL the target, and the text from the wildcard
matcher the candidate.

When a wildcard pattern, excluding '*',  is created:

   - the candidate text is broken into fragments separated by characters
   that are not Character.isLetterOrDigit() (See
   https://docs.oracle.com/javase/8/docs/api/java/lang/Character.html).
   - fragments that contain 1 character are ignored.
   - fragments that contains 2 or more characters are scanned and every
   every pair of characters used to create a Hasher (See
   
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/Hasher.html)
   that hasher is added to a Bloom filter associated with the wildcard pattern.

When a target is being evaluated and matching wildcard entries are to be
located. Split and create a bloom filter using entry and the same strategy
as for the wildcard patterns above.  These bloom filters will have had more
pairs of characters added than for a matching wildcard pattern.

Each filter contains the pattern for the unique pairs in the fragments of
the original text:

Now we can check the Bloom filters.

To find potential matching patterns we can check to see if the Bloom filter
for the pattern is contained within the bloom filter for the entry text.
If so then we know that it is likely that the pairs of characters specified
in the wild card (non-wildcard text) appear in the entry text.

At this point we can evaluate the reduced number of patterns to see which
ones actually match.

Having reduced the candidates to the matching patterns we can then use a
standard measure of similarity like the Levenshtein distance to determine
which candidates require the fewest edits to evolve into the target.  The
one with the fewest edits is the most specific and should be applied.

Now if you want to know which patterns overlap you have a similar process.

Each Bloom filter can calculate the estimated number of unique fragments
that were added to it.  If the filters are sorted by this estimation, the
ones with the highest counts may contain the ones with the lowest counts,
but a lower count can never contain a higher count.  The calculation to
perform the estimation can be skipped and the cardinality value of each
Bloom filter can be used instead.  You can then check the smaller filters
against the larger ones and find where one candidate is contained within
another.

If you want to know if they intersect the BloomFilter from
commons-collections has an estimateIntersection as well as an estimateUnion
method.  Quick tests can be made between filters to see if there is any
overlap before more complex analysis is performed.

The solution here does not make the final comparisons easier, it simply
reduces the search space to find the items that need to be compared.


On Mon, May 6, 2024 at 9:03 PM Greg Harris 
wrote:

> Hi Murali,
>
> Thanks for the KIP!
>
> I think I understand the motivation for this KIP in situations where
> there are a "cross product" of topics for two or more variables X and
> Y, and want to write ACLs for each of the variable axes.
> If you format your topics "X-Y-suffix", it's not easy to write rules
> that apply to all "Y" topics, because you need to enumerate all of the
> "X" values, and the problem persists even if you reorder the topic
> name.
>
> In my recent work on KIP-986 I found it necessary to introduce
> "namespaces" to group topics together, and I was going to replicate
> the ACL system to specify those namespaces. This change to the ACL
> system could increase the expressiveness and complexity of that
> feature, if it is ever implemented.
> One of the primitives I needed when specifying namespaces was the
> ability to tell when two namespaces overlapped (i.e. does there exist
> any topic which is present in both namespaces). This is trivial to do
> with the current PREFIX and LITERAL system, as we can find the
> maximum-length common prefix with just some length comparisons and
> equality checks.
> I considered specifying namespaces via regular expressions, and found
> that it was computationally much more difficult. Computing the
> intersection of two regexes appears to be exponential in the length of
> the regexes, leading me to avoid adding it.
>
> I understand that you're not suggesting full REGEX support, and that
> "namespaces" don't need to support MATCH, but I think MATCH may run
> into related difficulties. Any MATCH can overlap with any other MATCH
> or PREFIX if it includes 

[jira] [Created] (KAFKA-16789) Thread leak detection checks for incorrect QuorumController thread name

2024-05-17 Thread Gaurav Narula (Jira)
Gaurav Narula created KAFKA-16789:
-

 Summary: Thread leak detection checks for incorrect 
QuorumController thread name
 Key: KAFKA-16789
 URL: https://issues.apache.org/jira/browse/KAFKA-16789
 Project: Kafka
  Issue Type: Test
Reporter: Gaurav Narula


[PR-11417|https://github.com/apache/kafka/pull/11417] introduced thread leak 
detection for QuromController event queue thread. Later, 
[PR-13390|https://github.com/apache/kafka/pull/13390] changed conventions 
around thread names used in Kraft. Unfortunately, the thread-leak detection bit 
wasn't updated in the latter PR.

We should update {{TestUtils::verifyNoUnexpectedThreads}} to instead check for 
event handler thread leaks by checking for the {{"event-handler"}} suffix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1042 support for wildcard when creating new acls

2024-05-17 Thread Muralidhar Basani
Updated KIP with the provided suggestions from Greg and Claude.

We shall optimize matching acls with Trie based solutions for PREFIXED and
LITERAL acls. Basically we define trie structures, populate trie and
retrieve acls which seems very efficient.
This way, we aim to reduce the time to authorize every call even after
introducing MATCH.

On Thu, May 16, 2024 at 12:38 PM Claude Warren  wrote:

> I think that a Trie solution for LITERAL and PREFIX would be faster than
> attempting to use the wildcard search strategies on them.  The reasons
> being
>
>- that the wildcard search strategy may return some non-matching (false
>positive) patterns that have to be checked and ignored.
>- the size of the Bloom filters used for searching may be artificially
>inflated because of the length of the LITERAL matches (PREFIX matches
> too
>but LITERAL will probably be longer anyway)
>- The WILDCARD type will require a Bloom filter and it should be created
>once and accept the 80-100 byte of overhead rather than the calculation
>time on every use.
>
>
>
> On Wed, May 15, 2024 at 5:00 PM Murali Basani 
> wrote:
>
> > Thank you Haruki for the feedback.
> > There are certainly many such use cases where customers ended up creating
> > custom authorizers to handle these MATCH based patterns.
> > And regarding updating method matchingAcls, Claude updated the KIP
> > mentioning an approach, which definitely optimizes the flow, but can be
> > discussed further during the PR implementation I believe.
> >
> > Thank you Greg for the feedback.
> > I understand with trie, existing mechanisms for PREFIX/LITERAL can be
> > optimized. How about even trying this matchingAcls with a new
> > implementation which Claude proposed, may be we can improve the existing
> > flows. Probably we can see those details during PR development.
> >
> > If we don't achieve the expected results as per AuthorizerBenchmark, we
> can
> > drop this kip.
> >
> > And thank you Claude for the suggestion on the new implementation.
> >
> > On Tue, May 7, 2024 at 4:37 PM Claude Warren, Jr
> >  wrote:
> >
> > > I have updated KIP-1042 with a proposal for how to reduce the time
> spent
> > > looking for matching wildcard patterns.  Experimentally I see a
> reduction
> > > of 66-75% execution time.
> > >
> > > On Mon, May 6, 2024 at 9:03 PM Greg Harris
>  > >
> > > wrote:
> > >
> > > > Hi Murali,
> > > >
> > > > Thanks for the KIP!
> > > >
> > > > I think I understand the motivation for this KIP in situations where
> > > > there are a "cross product" of topics for two or more variables X and
> > > > Y, and want to write ACLs for each of the variable axes.
> > > > If you format your topics "X-Y-suffix", it's not easy to write rules
> > > > that apply to all "Y" topics, because you need to enumerate all of
> the
> > > > "X" values, and the problem persists even if you reorder the topic
> > > > name.
> > > >
> > > > In my recent work on KIP-986 I found it necessary to introduce
> > > > "namespaces" to group topics together, and I was going to replicate
> > > > the ACL system to specify those namespaces. This change to the ACL
> > > > system could increase the expressiveness and complexity of that
> > > > feature, if it is ever implemented.
> > > > One of the primitives I needed when specifying namespaces was the
> > > > ability to tell when two namespaces overlapped (i.e. does there exist
> > > > any topic which is present in both namespaces). This is trivial to do
> > > > with the current PREFIX and LITERAL system, as we can find the
> > > > maximum-length common prefix with just some length comparisons and
> > > > equality checks.
> > > > I considered specifying namespaces via regular expressions, and found
> > > > that it was computationally much more difficult. Computing the
> > > > intersection of two regexes appears to be exponential in the length
> of
> > > > the regexes, leading me to avoid adding it.
> > > >
> > > > I understand that you're not suggesting full REGEX support, and that
> > > > "namespaces" don't need to support MATCH, but I think MATCH may run
> > > > into related difficulties. Any MATCH can overlap with any other MATCH
> > > > or PREFIX if it includes a sufficient number of wildcards. For
> > > > example:
> > > > MATCH *-accounts-* has overlap with PREFIX nl as they can both match
> > > > "nl-accounts-localtopic", but that isn't sensitive to the contents
> > > > "nl", it is true for any PREFIX.
> > > > MATCH *-accounts-* has overlap with MATCH *localtopic, as they can
> > > > both match "nl-accounts-localtopic", but that isn't actually
> sensitive
> > > > to the contents "localtopic", it's true for any MATCH which includes
> a
> > > > wildcard at the beginning.
> > > >
> > > > This has implications for execution complexity: If we can't compute
> > > > whether two patterns overlap, then we need to run both of them on
> each
> > > > piece of input to test if they both match. Under the current
> > > > LITERAL/PREFIX 

[jira] [Created] (KAFKA-16788) Resource leakage due to absence of close() call on connector start failure

2024-05-17 Thread Ashok (Jira)
Ashok created KAFKA-16788:
-

 Summary: Resource leakage due to absence of close() call on 
connector start failure
 Key: KAFKA-16788
 URL: https://issues.apache.org/jira/browse/KAFKA-16788
 Project: Kafka
  Issue Type: Bug
  Components: connect
Reporter: Ashok
Assignee: Ashok


We have identified a potential issue in the WorkerConnector class of the 
Connect framework. Specifically, the close() method is not being called on the 
connector when the connector fails to start due to various reasons. This 
omission prevents the connector from releasing any resources that were created 
or started as part of the start() method. As a result, these resources remain 
allocated even after the connector has failed to start, leading to resource 
leakage.  

To address this issue, we propose modifying the WorkerConnector class to ensure 
that the close() method is called on the connector whenever the connector fails 
to start. This change will allow the connector to properly release its 
resources, preventing resource leakage.

*Steps to Reproduce:*  
 # Initiate a connector that creates or allocates resources (for instance, 
threads) during the execution of the start() method.
 # Generate a problem that, during the start() process, either triggers an 
exception or invokes the raiseError(Exception e) method of the 
WorkerConnectorContext.
 # Notice that the close() method is not invoked on the connector, resulting in 
resource leakage, as the stop() method is where the resources are typically 
closed.

In our scenario, the issue was related to threads not being properly closed. 
These threads were initiated as part of the start() method in the connector.

*Expected Result:*  

When a connector fails to start, the close() method should be called to allow 
the connector to release its resources.  

*Actual Result:*  

The close() method is not called when a connector fails to start, leading to 
resource leakage. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-17 Thread Nick Telford
Hi everyone,

As discussed on the Zoom call, we're going to handle rebalance meta-data by:

- On start-up, Streams will open each store and read its changelog offsets
into an in-memory cache. This cache will be shared among all StreamThreads.
- On rebalance, the cache will be consulted for Task offsets for any Task
that is not active on any instance-local StreamThreads. If the Task is
active on *any* instance-local StreamThread, we will report the Task lag as
"up to date" (i.e. -1), because we know that the local state is currently
up-to-date.

We will avoid caching offsets across restarts in the legacy ".checkpoint"
file, so that we can eliminate the logic for handling this class. If
performance of opening/closing many state stores is poor, we can
parallelise it by forking off a thread for each Task directory when reading
the offsets.

I'll update the KIP later today to reflect this design, but I will try to
keep it high-level, so that the exact implementation can vary.

Regards,

Nick

On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman 
wrote:

> 103: I like the idea of immediately deprecating #managesOffsets and aiming
> to make offset management mandatory in the long run. I assume we would also
> log a warning for any custom stores that return "false" from this method to
> encourage custom store implementations to start doing so? My only
> question/concern is that if we want folks to start managing their own
> offsets then we should make this transition easy for them, perhaps by
> exposing some public utility APIs for things that are currently handled by
> Kafka Streams such as reading/writing checkpoint files. Maybe it would be
> useful to include a small example in the KIP of what it would actually mean
> to "manage your own offsets" -- I know (all too well) that plugging in
> custom storage implementations is not easy and most people who do this are
> probably fairly advanced users, but offset management will be a totally new
> ballgame to most people people and this kind of feels like throwing them
> off the deep end. We should at least provide a lifejacket via some kind of
> utility API and/or example
>
> 200. There's been a lot of back and forth on the rebalance metadata/task
> lag computation question, so forgive me if I missed any part of this, but I
> think we've landed at the right idea here. To summarize: the "tl;dr"
> explanation is that we'll write the checkpoint file only on close and will
> account for hard-crash scenarios by opening up the stores on startup and
> writing a checkpoint file for any missing tasks. Does that sound about
> right?
>
> A few clarifications:
> I think we're all more or less on the same page here but just to be
> absolutely clear, the task lags for each task directory found on disk will
> be reported by only one of the StreamThreads, and each StreamThread will
> report lags only for tasks that it already owns or are not assigned to any
> other StreamThread in the client. In other words, we only need to get the
> task lag for completely unassigned/unlocked tasks, which means if there is
> a checkpoint file at all then it must be up-to-date, because there is no
> other StreamThread actively writing to that state store (if so then only
> that StreamThread would report lag for that particular task).
>
> This still leaves the "no checkpoint at all" case which as previously
> mentioned can occur after a  hard-crash. Luckily we only have to worry
> about this once, after starting up again following said hard crash. We can
> simply open up each of the state stores before ever joining the group, get
> the offsets from rocksdb, and write them to a new checkpoint file. After
> that, we can depend on the checkpoints written at close and won't have to
> open up any stores that aren't already assigned for the reasons laid out in
> the paragraph above.
>
> As for the specific mechanism and which thread-does-what, since there were
> some questions, this is how I'm imagining the process:
>
>1.   The general idea is that we simply go through each task directories
>with state but no checkpoint file and open the StateStore, call
>#committedOffset, and then write it to the checkpoint file. We can then
>close these stores and let things proceed as normal.
>2.  This only has to happen once, during startup, but we have two
>options:
>   1. Do this from KafkaStreams#start, ie before we even create the
>   StreamThreads
>   2.  Do this from StreamThread#start, following a similar lock-based
>   approach to the one used #computeTaskLags, where each StreamThread
> just
>   makes a pass over the task directories on disk and attempts to lock
> them
>   one by one. If they obtain the lock, check whether there is state
> but no
>   checkpoint, and write the checkpoint if needed. If it can't grab
> the lock,
>   then we know one of the other StreamThreads must be handling the
> checkpoint
>   file for that task directory, and we can 

Re: please let me subscribe

2024-05-17 Thread Harry Fallows
Hi Greg,

Thank you for your help! I've used the correct subscription API now

 (consumer.subscribe(Arrays.asList("dev-subscr...@kafka.apache.org"));).

Apologies for the spam.

Harry Fallows

Software Engineer Idiot

On Thu, 16 May 2024 at 22:28, Greg Harris  wrote:

> Hi Harry,
>
> Thanks for your interest in Apache Kafka. You can see how to subscribe
> to the mailing list(s) here: https://kafka.apache.org/contact
> Please note that the email for subscribing/unsubscribing is different
> from the one used for sending and receiving emails once you're on the
> list.
> Also, subscribing needs a follow-up email to confirm the subscription,
> see here: https://www.apache.org/foundation/mailinglists#subscribing
>
> Hope this helps!
> Greg
>
> On Thu, May 16, 2024 at 5:51 AM Harry Fallows
>  wrote:
> >
> > please
> >
> > Harry Fallows
> >
> > Software Engineer
> >
> > Email: hfall...@thoughtmachine.net
> >
> > Web: www.thoughtmachine.net
> >
> >
> > 7 Herbrand Street | London | WC1N 1EX
> >
> >
> >
> > Data Classification: Internal
> >
> > --
> > Thought Machine Group Limited, a company registered in England & Wales.
> > Registered number: 4277.
> > Registered Office: 5 New Street Square,
> > London EC4A 3TW
> > <
> https://maps.google.com/?q=5+New+Street+Square,+London+EC4A+3TW=gmail=g
> >.
> >
> >
> > The content of this email is confidential and intended for the recipient
> > specified in message only. It is strictly forbidden to share any part of
> > this message with any third party, without a written consent of the
> sender.
> > If you received this message by mistake, please reply to this message and
> > follow with its deletion, so that we can ensure such a mistake does not
> > occur in the future.
>

-- 
Thought Machine Group Limited, a company registered in England & Wales.
Registered number: 4277. 
Registered Office: 5 New Street Square, 
London EC4A 3TW 
.


The content of this email is confidential and intended for the recipient 
specified in message only. It is strictly forbidden to share any part of 
this message with any third party, without a written consent of the sender. 
If you received this message by mistake, please reply to this message and 
follow with its deletion, so that we can ensure such a mistake does not 
occur in the future.


Re: [DISCUSS] KIP-1044: A proposal to change idempotent producer -- server implementation

2024-05-17 Thread Claude Warren
I think that the point here is that the design that assumes that you can
keep all the PIDs in memory for all server configurations and all usages
and all client implementations is fraught with danger.

Yes, there are solutions already in place (KIP-854) that attempt to address
this problem, and other proposed solutions to remove that have undesirable
side effects (e.g. Heartbeat interrupted by IP failure for a slow producer
with a long delay between posts).  KAFKA-16229 (Slow expiration of Producer
IDs leading to high CPU usage) dealt with how to expire data from the cache
so that there was minimal lag time.

But the net issue is still the underlying design/architecture.

There are a  couple of salient points here:

   - The state of a state machine is only a view on its transactions.  This
   is the classic stream / table dichotomy.
   - What the "cache" is trying to do is create that view.
   - In some cases the size of the state exceeds the storage of the cache
   and the systems fail.
   - The current solutions have attempted to place limits on the size of
   the state.
   - Errors in implementation and or configuration will eventually lead to
   "problem producers"
   - Under the adopted fixes and current slate of proposals, the "problem
   producers" solutions have cascading side effects on properly behaved
   producers. (e.g. dropping long running, slow producing producers)

For decades (at least since the 1980's and anecdotally since the 1960's)
there has been a solution to processing state where the size of the state
exceeded the memory available.  It is the solution that drove the idea that
you could have tables in Kafka.  The idea that we can store the hot PIDs in
memory using an LRU and write data to storage so that we can quickly find
things not in the cache is not new.  It has been proven.

I am arguing that we should not throw away state data because we are
running out of memory.  We should persist that data to disk and consider
the disk as the source of truth for state.

Claude


On Wed, May 15, 2024 at 7:42 PM Justine Olshan 
wrote:

> +1 to the comment.
>
> > I still feel we are doing all of this only because of a few anti-pattern
> or misconfigured producers and not because we have “too many Producer”.  I
> believe that implementing Producer heartbeat and remove short-lived PIDs
> from the cache if we didn’t receive heartbeat will be more simpler and step
> on right direction  to improve idempotent logic and maybe try to make PID
> get reused between session which will implement a real idempotent producer
> instead of idempotent session.  I admit this wouldn’t help with old clients
> but it will put us on the right path.
>
> This issue is very complicated and I appreciate the attention on it.
> Hopefully we can find a good solution working together :)
>
> Justine
>
> On Wed, May 15, 2024 at 8:36 AM Omnia Ibrahim 
> wrote:
>
> > Also in the rejection alternatives you listed an approved KIP which is a
> > bit confusing can you move this to motivations instead
> >
> > > On 15 May 2024, at 14:35, Claude Warren  wrote:
> > >
> > > This is a proposal that should solve the OOM problem on the servers
> > without
> > > some of the other proposed KIPs being active.
> > >
> > > Full details in
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1044%3A+A+proposal+to+change+idempotent+producer+--+server+implementation
> >
> >
>


-- 
LinkedIn: http://www.linkedin.com/in/claudewarren