[PR] MINOR: Fix `KafkaStreams#streamThreadLeaveConsumerGroup` logging [kafka]

2023-10-10 Thread via GitHub


lkokhreidze opened a new pull request, #14526:
URL: https://github.com/apache/kafka/pull/14526

   Fixes logging for `KafkaStreams#streamThreadLeaveConsumerGroup`.
   
   In order not to loose the trace of the whole exception, passing `Exception 
e` as a second argument, while message is pre-formatted as passed as string as 
a first argument. With this, we won't loose the stack trace of the exception.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest

2023-10-10 Thread Hanyu Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hanyu Zheng updated KAFKA-15569:

Description: 
* Update test and add test cases in IQv2StoreIntegrationTest.
 * Originally, all key-value pairs were confined to a single window, with all 
data added at WINDOW_START. To improve our testing, we've expanded to multiple 
windows.
 * "We've added four key-value pairs at intervals starting from WINDOW_START: 
at WINDOW_START, WINDOW_START + 6min, WINDOW_START + 12min, and WINDOW_START + 
18min. We're doing this to test the query methods of RangeQuery and 
WindowRangeQuery.

  was:
* Update test and add test cases in IQv2StoreIntegrationTest
 * Originally, all key-value pairs were confined to a single window, with all 
data added at WINDOW_START. To improve our testing, we've expanded to multiple 
windows
 * "We've added four key-value pairs at intervals starting from WINDOW_START: 
at WINDOW_START, WINDOW_START + 6min, WINDOW_START + 12min, and WINDOW_START + 
18min. We're doing this to test the query methods of RangeQuery and 
WindowRangeQuery.


> Update test and add test cases in IQv2StoreIntegrationTest
> --
>
> Key: KAFKA-15569
> URL: https://issues.apache.org/jira/browse/KAFKA-15569
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> * Update test and add test cases in IQv2StoreIntegrationTest.
>  * Originally, all key-value pairs were confined to a single window, with all 
> data added at WINDOW_START. To improve our testing, we've expanded to 
> multiple windows.
>  * "We've added four key-value pairs at intervals starting from WINDOW_START: 
> at WINDOW_START, WINDOW_START + 6min, WINDOW_START + 12min, and WINDOW_START 
> + 18min. We're doing this to test the query methods of RangeQuery and 
> WindowRangeQuery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended

2023-10-10 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze updated KAFKA-15571:
--
Affects Version/s: 3.6.0

> StateRestoreListener#onRestoreSuspended is never called because wrapper 
> DelegatingStateRestoreListener doesn't implement onRestoreSuspended
> ---
>
> Key: KAFKA-15571
> URL: https://issues.apache.org/jira/browse/KAFKA-15571
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0, 3.6.0, 3.5.1
>Reporter: Levani Kokhreidze
>Assignee: Levani Kokhreidze
>Priority: Major
>
> With https://issues.apache.org/jira/browse/KAFKA-10575 
> `StateRestoreListener#onRestoreSuspended` was added. But local tests show 
> that it is never called because `DelegatingStateRestoreListener` was not 
> updated to call a new method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest

2023-10-10 Thread Hanyu Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hanyu Zheng updated KAFKA-15569:

Description: 
* Update test and add test cases in IQv2StoreIntegrationTest
 * Originally, all key-value pairs were confined to a single window, with all 
data added at WINDOW_START. To improve our testing, we've expanded to multiple 
windows
 * "We've added four key-value pairs at intervals starting from WINDOW_START: 
at WINDOW_START, WINDOW_START + 6min, WINDOW_START + 12min, and WINDOW_START + 
18min. We're doing this to test the query methods of RangeQuery and 
WindowRangeQuery.

  was:Update test and add test cases in IQv2StoreIntegrationTest


> Update test and add test cases in IQv2StoreIntegrationTest
> --
>
> Key: KAFKA-15569
> URL: https://issues.apache.org/jira/browse/KAFKA-15569
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> * Update test and add test cases in IQv2StoreIntegrationTest
>  * Originally, all key-value pairs were confined to a single window, with all 
> data added at WINDOW_START. To improve our testing, we've expanded to 
> multiple windows
>  * "We've added four key-value pairs at intervals starting from WINDOW_START: 
> at WINDOW_START, WINDOW_START + 6min, WINDOW_START + 12min, and WINDOW_START 
> + 18min. We're doing this to test the query methods of RangeQuery and 
> WindowRangeQuery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15577) Reload4j | CVE-2022-45868

2023-10-10 Thread masood (Jira)
masood created KAFKA-15577:
--

 Summary: Reload4j | CVE-2022-45868
 Key: KAFKA-15577
 URL: https://issues.apache.org/jira/browse/KAFKA-15577
 Project: Kafka
  Issue Type: Bug
Reporter: masood


Maven indicates 
[CVE-2022-45868|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-45868] 
in Reload4j.jar.

[https://mvnrepository.com/artifact/ch.qos.reload4j/reload4j/1.2.19]

Could you please verify if this vulnerability affects Kafka?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0

2023-10-10 Thread Benoit Delbosc (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773911#comment-17773911
 ] 

Benoit Delbosc commented on KAFKA-15402:


The problem is still present on the latest version 3.6.0.

> Performance regression on close consumer after upgrading to 3.5.0
> -
>
> Key: KAFKA-15402
> URL: https://issues.apache.org/jira/browse/KAFKA-15402
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0, 3.6.0, 3.5.1
>Reporter: Benoit Delbosc
>Priority: Major
> Attachments: image-2023-08-24-18-51-21-720.png, 
> image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png
>
>
> Hi,
> After upgrading to Kafka client version 3.5.0, we have observed a significant 
> increase in the duration of our Java unit tests. These unit tests heavily 
> rely on the Kafka Admin, Producer, and Consumer API.
> When using Kafka server version 3.4.1, the duration of the unit tests 
> increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka 
> client 3.5.0).
> Upgrading the Kafka server to 3.5.1 show similar results.
> I have come across the issue KAFKA-15178, which could be the culprit. I will 
> attempt to test the proposed patch.
> In the meantime, if you have any ideas that could help identify and address 
> the regression, please let me know.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0

2023-10-10 Thread Benoit Delbosc (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benoit Delbosc updated KAFKA-15402:
---
Affects Version/s: 3.6.0

> Performance regression on close consumer after upgrading to 3.5.0
> -
>
> Key: KAFKA-15402
> URL: https://issues.apache.org/jira/browse/KAFKA-15402
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0, 3.6.0, 3.5.1
>Reporter: Benoit Delbosc
>Priority: Major
> Attachments: image-2023-08-24-18-51-21-720.png, 
> image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png
>
>
> Hi,
> After upgrading to Kafka client version 3.5.0, we have observed a significant 
> increase in the duration of our Java unit tests. These unit tests heavily 
> rely on the Kafka Admin, Producer, and Consumer API.
> When using Kafka server version 3.4.1, the duration of the unit tests 
> increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka 
> client 3.5.0).
> Upgrading the Kafka server to 3.5.1 show similar results.
> I have come across the issue KAFKA-15178, which could be the culprit. I will 
> attempt to test the proposed patch.
> In the meantime, if you have any ideas that could help identify and address 
> the regression, please let me know.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15576) Add 3.6.0 to broker/client and streams upgrade/compatibility tests

2023-10-10 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15576:
---
Summary: Add 3.6.0 to broker/client and streams upgrade/compatibility tests 
 (was: Add 3.2.0 to broker/client and streams upgrade/compatibility tests)

> Add 3.6.0 to broker/client and streams upgrade/compatibility tests
> --
>
> Key: KAFKA-15576
> URL: https://issues.apache.org/jira/browse/KAFKA-15576
> Project: Kafka
>  Issue Type: Task
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15576) Add 3.2.0 to broker/client and streams upgrade/compatibility tests

2023-10-10 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-15576:
--

 Summary: Add 3.2.0 to broker/client and streams 
upgrade/compatibility tests
 Key: KAFKA-15576
 URL: https://issues.apache.org/jira/browse/KAFKA-15576
 Project: Kafka
  Issue Type: Task
Reporter: Satish Duggana
 Fix For: 3.7.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15535) Add documentation of "remote.log.index.file.cache.total.size.bytes" configuration property.

2023-10-10 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773898#comment-17773898
 ] 

Satish Duggana commented on KAFKA-15535:


Thanks [~hudeqi] for checking that out. 

> Add documentation of "remote.log.index.file.cache.total.size.bytes" 
> configuration property. 
> 
>
> Key: KAFKA-15535
> URL: https://issues.apache.org/jira/browse/KAFKA-15535
> Project: Kafka
>  Issue Type: Task
>  Components: documentation
>Reporter: Satish Duggana
>Assignee: hudeqi
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.7.0
>
>
> Add documentation of "remote.log.index.file.cache.total.size.bytes" 
> configuration property. 
> Please double check all the existing public tiered storage configurations. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15535) Add documentation of "remote.log.index.file.cache.total.size.bytes" configuration property.

2023-10-10 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana resolved KAFKA-15535.

Resolution: Fixed

> Add documentation of "remote.log.index.file.cache.total.size.bytes" 
> configuration property. 
> 
>
> Key: KAFKA-15535
> URL: https://issues.apache.org/jira/browse/KAFKA-15535
> Project: Kafka
>  Issue Type: Task
>  Components: documentation
>Reporter: Satish Duggana
>Assignee: hudeqi
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.7.0
>
>
> Add documentation of "remote.log.index.file.cache.total.size.bytes" 
> configuration property. 
> Please double check all the existing public tiered storage configurations. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15570: Add unit tests for MemoryConfigBackingStore [kafka]

2023-10-10 Thread via GitHub


yashmayya commented on code in PR #14518:
URL: https://github.com/apache/kafka/pull/14518#discussion_r1354103192


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java:
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MemoryConfigBackingStoreTest {
+
+private static final List CONNECTOR_IDS = 
Arrays.asList("connector1", "connector2");
+
+// Actual values are irrelevant here and can be used as either connector 
or task configurations
+private static final List> SAMPLE_CONFIGS = 
Arrays.asList(
+Collections.singletonMap("config-key-one", "config-value-one"),
+Collections.singletonMap("config-key-two", "config-value-two"),
+Collections.singletonMap("config-key-three", "config-value-three")
+);
+
+@Mock
+private ConfigBackingStore.UpdateListener configUpdateListener;
+private final MemoryConfigBackingStore configStore = new 
MemoryConfigBackingStore();

Review Comment:
   JUnit creates a new instance of the test class for each test method run 
(exactly so that state isn't shared between tests), so this shouldn't really 
make a difference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Disable flaky kraft-test in FetchRequestTest [kafka]

2023-10-10 Thread via GitHub


dengziming opened a new pull request, #14525:
URL: https://github.com/apache/kafka/pull/14525

   *More detailed description of your change*
   We introduced a bunch of flaky tests in #14295 , which are normal when 
running locally but will always fail in CI, lets rollback them before we find 
the cause. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15507) adminClient should not throw retriable exception when closing instance

2023-10-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15507.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> adminClient should not throw retriable exception when closing instance
> --
>
> Key: KAFKA-15507
> URL: https://issues.apache.org/jira/browse/KAFKA-15507
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 3.5.1
>Reporter: Luke Chen
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.7.0
>
>
> When adminClient is closing the instance, it'll first set 
> `hardShutdownTimeMs` to a positive timeout value, and then wait until 
> existing threads to complete within the timeout. However, within this 
> waiting, when new caller tries to invoke new commend in adminClient, it'll 
> immediately get an 
> {code:java}
> TimeoutException("The AdminClient thread is not accepting new calls.")
> {code}
> There are some issues with the design:
> 1. Since the `TimeoutException` is a retriable exception, the caller will 
> enter a tight loop and keep trying it
> 2. The error message is confusing. What does "the adminClient is not 
> accepting new calls" mean?
> We should improve it by throwing a non-retriable error (ex: 
> IllegalStateException), then, the error message should clearly describe the 
> adminClient is closing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing [kafka]

2023-10-10 Thread via GitHub


showuon merged PR #14455:
URL: https://github.com/apache/kafka/pull/14455


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing [kafka]

2023-10-10 Thread via GitHub


showuon commented on PR #14455:
URL: https://github.com/apache/kafka/pull/14455#issuecomment-1756715114

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove StreamsProducer flush under EOS [kafka]

2023-10-10 Thread via GitHub


github-actions[bot] commented on PR #13994:
URL: https://github.com/apache/kafka/pull/13994#issuecomment-1756711475

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13756: Connect validate endpoint should return proper validatio… [kafka]

2023-10-10 Thread via GitHub


github-actions[bot] commented on PR #13813:
URL: https://github.com/apache/kafka/pull/13813#issuecomment-1756711573

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-10 Thread via GitHub


showuon merged PR #14482:
URL: https://github.com/apache/kafka/pull/14482


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-10 Thread via GitHub


showuon commented on PR #14482:
URL: https://github.com/apache/kafka/pull/14482#issuecomment-1756672787

   Failed tests are unrelated and also failed in trunk build.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15570: Add unit tests for MemoryConfigBackingStore [kafka]

2023-10-10 Thread via GitHub


kpatelatwork commented on code in PR #14518:
URL: https://github.com/apache/kafka/pull/14518#discussion_r1353833312


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java:
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MemoryConfigBackingStoreTest {
+
+private static final List CONNECTOR_IDS = 
Arrays.asList("connector1", "connector2");
+
+// Actual values are irrelevant here and can be used as either connector 
or task configurations
+private static final List> SAMPLE_CONFIGS = 
Arrays.asList(
+Collections.singletonMap("config-key-one", "config-value-one"),
+Collections.singletonMap("config-key-two", "config-value-two"),
+Collections.singletonMap("config-key-three", "config-value-three")
+);
+
+@Mock
+private ConfigBackingStore.UpdateListener configUpdateListener;
+private final MemoryConfigBackingStore configStore = new 
MemoryConfigBackingStore();

Review Comment:
   should this be declared non-final and instantiated in the `setup()` instead 
so we don't share state between tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353833261


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+ */
+@Override
+public Optional offsetExpirationCondition() {
+if (protocolType.isPresent()) {
+if (isInState(EMPTY)) {
+// No consumer exists in the group =>
+// - If current state timestamp exists and retention period 
has passed since group became Empty,
+//   expire all offsets with no pending offset commit;
+// - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+//   since the last commit timestamp, expire the offset
+return Optional.of(new OffsetExpirationConditionImpl(
+offsetAndMetadata -> 
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs))
+);
+} else if (usesConsumerGroupProtocol() && 
subscribedTopics.isPresent() && isInState(STABLE)) {
+// Consumers exist in the group and group is Stable =>
+// - If the group is aware of the subscribed topics and 
retention period had passed since the
+//   last commit timestamp, expire the offset. offset with 
pending offset commit are not
+//   expired
+return Optional.of(new 
OffsetExpirationConditionImpl(offsetAndMetadata -> 
offsetAndMetadata.commitTimestampMs));
+}
+} else {
+// protocolType is None => standalone (simple) consumer, that uses 
Kafka for offset storage only
+// expire offsets where retention period has passed since their 
last commit

Review Comment:
   nit: periods and capitalization



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353832984


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+ */
+@Override
+public Optional offsetExpirationCondition() {
+if (protocolType.isPresent()) {
+if (isInState(EMPTY)) {
+// No consumer exists in the group =>
+// - If current state timestamp exists and retention period 
has passed since group became Empty,
+//   expire all offsets with no pending offset commit;
+// - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+//   since the last commit timestamp, expire the offset
+return Optional.of(new OffsetExpirationConditionImpl(
+offsetAndMetadata -> 
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs))
+);
+} else if (usesConsumerGroupProtocol() && 
subscribedTopics.isPresent() && isInState(STABLE)) {
+// Consumers exist in the group and group is Stable =>
+// - If the group is aware of the subscribed topics and 
retention period had passed since the
+//   last commit timestamp, expire the offset. offset with 
pending offset commit are not

Review Comment:
   nit: Offsets* with pending o...are not expired.* 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353832660


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+ */
+@Override
+public Optional offsetExpirationCondition() {
+if (protocolType.isPresent()) {
+if (isInState(EMPTY)) {
+// No consumer exists in the group =>
+// - If current state timestamp exists and retention period 
has passed since group became Empty,
+//   expire all offsets with no pending offset commit;
+// - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+//   since the last commit timestamp, expire the offset
+return Optional.of(new OffsetExpirationConditionImpl(
+offsetAndMetadata -> 
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs))
+);
+} else if (usesConsumerGroupProtocol() && 
subscribedTopics.isPresent() && isInState(STABLE)) {
+// Consumers exist in the group and group is Stable =>
+// - If the group is aware of the subscribed topics and 
retention period had passed since the

Review Comment:
   nit: has*



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353832416


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+ */
+@Override
+public Optional offsetExpirationCondition() {
+if (protocolType.isPresent()) {
+if (isInState(EMPTY)) {
+// No consumer exists in the group =>

Review Comment:
   nit: No consumers* exist* in the group, also do we wanna name it members to 
be consistent with the rest of the code terminology? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353831509


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.

Review Comment:
   nit: if*



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353831027


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestampMs = time.milliseconds();
+Optional offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+if (!offsetExpirationCondition.isPresent()) {
+return false;
+}
+
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!group.isSubscribedToTopic(topic)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestampMs, config.offsetsRetentionMs)) {
+
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records).toString());
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+
+if (!expiredPartitions.isEmpty()) {
+log.info("[GroupId {}] Expiring offsets of partitions 
(hasAllOffsetsExpired={}): {}",

Review Comment:
   The placement of hasAllOffsetsExpired seems a bit off for the logging 
message `[GroupId 12345] Expiring offsets of partitions 
(hasAllOffsetsExpired=false): partition1, partition2, partition3` This is how 
it would look right? Can we move it to the end of the list or the beginning of 
this line if you think it makes sense as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353831281


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -635,6 +639,21 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupEpochTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return state() == ConsumerGroupState.EMPTY;
+}
+
+/**
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.

Review Comment:
   nit: if*
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353830271


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestampMs = time.milliseconds();
+Optional offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+if (!offsetExpirationCondition.isPresent()) {
+return false;
+}
+
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!group.isSubscribedToTopic(topic)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestampMs, config.offsetsRetentionMs)) {
+
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records).toString());
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+
+if (!expiredPartitions.isEmpty()) {
+log.info("[GroupId {}] Expiring offsets of partitions 
(hasAllOffsetsExpired={}): {}",

Review Comment:
   Correct me if I'm wrong this is just for my understanding, so unless all the 
offsets are expired we don't delete the group right? In cases where 
hasAllOffsetsExpired is false and expiredPartitions is non-empty, the group 
won't be deleted but the tombstone records will be appended? And the next time 
we iterate through the partitions the ones with the tombstone record aren't 
included right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353823509


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestampMs = time.milliseconds();
+Optional offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+if (!offsetExpirationCondition.isPresent()) {
+return false;
+}
+
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);

Review Comment:
   nit: haveAllOffsetsExpired



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] System Tests for KIP-848 [kafka]

2023-10-10 Thread via GitHub


rreddy-22 opened a new pull request, #14524:
URL: https://github.com/apache/kafka/pull/14524

   Added configs and custom decorators to facilitate the testing of the old 
protocol with the new group coordinator (KIP-848) in kraft mode. 
   The new coordinator doesn't support zookeeper mode and hence, this 
combination will be skipped.
   **Files Changed:**
   1) kafka.py : Added an argument called use_new_coordinator to the kafka 
service which can be set in the matrix of every test. The default value for 
this config will be set as false.
   2) util.py : Added a custom decorator that will skip the test for zk=true 
and use_new_coordinator=true since this isn't supported.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353679164


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.

Review Comment:
   yes
   
   ```
   groupMetadataManager.groupIds().forEach(groupId -> {
   if (offsetMetadataManager.cleanupExpiredOffsets(groupId, 
records)) {
   groupMetadataManager.maybeDeleteGroup(groupId, records);
   }
   });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-10 Thread via GitHub


jeffkbkim commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1756480353

   @jolshan this is flaky in trunk as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-10 Thread via GitHub


jolshan commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1756470166

   I restarted the build. But can we take a look at 
   
`[kafka.server.FetchRequestTest.testLastFetchedEpochValidationV12(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14417/14/testReport/junit/kafka.server/FetchRequestTest/Build___JDK_11_and_Scala_2_13___testLastFetchedEpochValidationV12_String__quorum_kraft/)`
   
   It seems to be failing on all the versions that ran for build 14.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended

2023-10-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773849#comment-17773849
 ] 

Matthias J. Sax commented on KAFKA-15571:
-

Ups... Thanks to reporting and the PR!

> StateRestoreListener#onRestoreSuspended is never called because wrapper 
> DelegatingStateRestoreListener doesn't implement onRestoreSuspended
> ---
>
> Key: KAFKA-15571
> URL: https://issues.apache.org/jira/browse/KAFKA-15571
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0, 3.5.1
>Reporter: Levani Kokhreidze
>Assignee: Levani Kokhreidze
>Priority: Major
>
> With https://issues.apache.org/jira/browse/KAFKA-10575 
> `StateRestoreListener#onRestoreSuspended` was added. But local tests show 
> that it is never called because `DelegatingStateRestoreListener` was not 
> updated to call a new method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]

2023-10-10 Thread via GitHub


CalvinConfluent commented on PR #14053:
URL: https://github.com/apache/kafka/pull/14053#issuecomment-1756411287

   Thanks @hachikuji , verified the failed UT can pass locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement partition assignment reconciliation [kafka]

2023-10-10 Thread via GitHub


kirktrue commented on PR #14357:
URL: https://github.com/apache/kafka/pull/14357#issuecomment-1756404090

   @lianetm @philipnee @dajac we can start reviewing this PR now. I'll keep 
working on the unit tests in the background.
   
   I resolved a bunch of comments that pertained to a former implementation 
approach that we abandoned. If any are still relevant, feel free to un-resolve 
them.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement partition assignment reconciliation [kafka]

2023-10-10 Thread via GitHub


kirktrue commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1353544098


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignmentReconciler.java:
##
@@ -85,122 +111,162 @@ enum ReconciliationResult {
 private final BlockingQueue backgroundEventQueue;
 private Optional inflightCallback;
 
-public MemberAssignmentReconciler(LogContext logContext,
-  SubscriptionState subscriptions,
-  ConsumerMetadata metadata,
-  BlockingQueue 
backgroundEventQueue) {
+AssignmentReconciler(LogContext logContext,
+ SubscriptionState subscriptions,
+ ConsumerMetadata metadata,
+ BlockingQueue backgroundEventQueue) {
 this.log = logContext.logger(getClass());
 this.subscriptions = subscriptions;
 this.metadata = metadata;
 this.backgroundEventQueue = backgroundEventQueue;
 }
 
 /**
- * Perform the revocation process, if necessary, depending on the given 
{@link Assignment target assignment}. If the
- * {@link SubscriptionState#assignedPartitions() current set of assigned 
partitions} includes entries that are
- * not in the target assignment, these will be considered for 
revocation. If there is already a
- * reconciliation in progress (revocation or assignment), this method will 
return without performing any
- * revocation.
+ * Perform the reconciliation process, as necessary to meet the given 
{@link Assignment target assignment}. Note
+ * that the reconciliation is a multi-step process, and this method should 
be invoked on each heartbeat if
+ * the coordinator provides a {@link Assignment target assignment}.
  *
  * @param assignment Target {@link Assignment}
  * @return {@link ReconciliationResult}
  */
-ReconciliationResult revoke(Optional assignment) {
+ReconciliationResult maybeReconcile(Optional assignment) {
 // Check for any outstanding operations first. If a conclusive result 
has already been reached, return that
 // before processing any further.
-Optional inflightStatus = checkInflightStatus();
+if (inflightCallback.isPresent()) {
+// We don't actually need the _result_ of the event, just to know 
that it's complete.
+if (inflightCallback.get().future().isDone()) {
+// This is the happy path--we completed the callback. Clear 
out our inflight callback first, though.
+inflightCallback = Optional.empty();

Review Comment:
   Is the `HeartbeatRequestManager` going to call `AssignmentReconciler.lose()` 
to drop the partitions at that point? If so, this is the code at the top of 
that `lose()` method:
   
   ```java
   ReconciliationResult lose() {
   // Clear the inflight callback reference. This is done regardless of 
if one existed; if there was one it is
   // now abandoned because we're going to "lose" our partitions. This 
will also allow us to skip the inflight
   // check the other steps take.
   inflightCallback = Optional.empty();
   
   . . . 
   ```
   
   Does that seem sufficient, or is more needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement partition assignment reconciliation [kafka]

2023-10-10 Thread via GitHub


kirktrue commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1353534627


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignmentReconciler.java:
##
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.LosePartitionsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions;
+import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * {@code AssignmentReconciler} performs the work of reconciling this 
consumer's partition assignment as directed
+ * by the consumer group coordinator. When the coordinator determines that a 
change to the partition ownership of
+ * the group is required, it will communicate with each consumer to relay its 
respective target
+ * assignment, that is, the set of partitions for which that consumer should 
now assume ownership. It is the then the
+ * responsibility of the consumer to work toward that target by performing the 
necessary internal modifications to
+ * satisfy the assignment from the coordinator. In practical terms, this means 
that it must first determine the set
+ * difference between the {@link SubscriptionState#assignedPartitions() 
current assignment} and the
+ * {@link Assignment#assignedTopicPartitions() target assignment}.
+ *
+ * 
+ *
+ * Internally, reconciliation is a multi-step process:
+ *
+ * 
+ * Calculating partitions to revoke
+ * Invoking {@link 
ConsumerRebalanceListener#onPartitionsRevoked(Collection)}
+ * Removing those partitions from its {@link 
SubscriptionState#assignFromSubscribed(Collection) assignment}
+ * Perform a heartbeat acknowledgement with the group coordinator
+ * Calculating partitions to assign
+ * Invoking {@link 
ConsumerRebalanceListener#onPartitionsAssigned(Collection)}
+ * Adding those partitions to its {@link 
SubscriptionState#assignFromSubscribed(Collection) assignment}
+ * Perform a heartbeat acknowledgement with the group coordinator
+ * 
+ *
+ * 
+ *
+ * Because the target assignment from the group coordinator is 
declarative, the implementation of the
+ * reconciliation process is idempotent. The caller of this class is free to 
invoke {@link #maybeReconcile(Optional)}
+ * repeatedly for as long as the group coordinator provides an {@link 
Assignment}.
+ *
+ * 
+ * 
+ * {@link ReconciliationResult#UNCHANGED}: no changes were made to the 
set of partitions.
+ * 
+ * 
+ * {@link ReconciliationResult#RECONCILING}: changes to the assignment 
have started. In practice this means
+ * that the appropriate {@link ConsumerRebalanceListener} callback 
method is being invoked.
+ * 
+ * 
+ * {@link ReconciliationResult#APPLIED_LOCALLY}: the {@link 
ConsumerRebalanceListener} callback method was made and
+ * the changes were applied locally.
+ * 
+ * 
+ *
+ * The comparison against the {@link SubscriptionState#assignedPartitions() 
current set of assigned partitions} and
+ * the {@link Assignment#assignedTopicPartitions() t

Re: [PR] KAFKA-15276: Implement partition assignment reconciliation [kafka]

2023-10-10 Thread via GitHub


kirktrue commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1353533180


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignmentReconciler.java:
##
@@ -44,21 +45,46 @@
 import java.util.stream.Collectors;
 
 /**
- * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to 
first determine, and then modify the
- * current set of assigned {@link TopicPartition partitions} via {@link 
SubscriptionState}. Reconciliation is a
- * two-part process, the first being a revocation of partitions, followed by 
assignment of partitions. Each of the two
- * steps may result in one of the following:
+ * {@code AssignmentReconciler} performs the work of reconciling this 
consumer's partition assignment as directed
+ * by the consumer group coordinator. When the coordinator determines that a 
change to the partition ownership of
+ * the group is required, it will communicate with each consumer to relay its 
respective target
+ * assignment, that is, the set of partitions for which that consumer should 
now assume ownership. It is the then the
+ * responsibility of the consumer to work toward that target by performing the 
necessary internal modifications to
+ * satisfy the assignment from the coordinator. In practical terms, this means 
that it must first determine the set
+ * difference between the {@link SubscriptionState#assignedPartitions() 
current assignment} and the
+ * {@link Assignment#assignedTopicPartitions() target assignment}.
+ *
+ * 
+ *
+ * Internally, reconciliation is a multi-step process:
+ *
+ * 
+ * Calculating partitions to revoke
+ * Invoking {@link 
ConsumerRebalanceListener#onPartitionsRevoked(Collection)}

Review Comment:
   Yes, that has been split off into KAFKA-15573.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Update test and add test cases in IQv2StoreIntegrationTest [kafka]

2023-10-10 Thread via GitHub


hanyuzheng7 opened a new pull request, #14523:
URL: https://github.com/apache/kafka/pull/14523

   Update test and add test cases in IQv2StoreIntegrationTest
   
   Change the input key-value pair timestamp and add more test cases.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15575) Prevent Connectors from exceeding tasks.max configuration

2023-10-10 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15575:
---

 Summary: Prevent Connectors from exceeding tasks.max configuration
 Key: KAFKA-15575
 URL: https://issues.apache.org/jira/browse/KAFKA-15575
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Greg Harris


The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
enumerate tasks configurations. This takes an argument which comes from the 
tasks.max connector config. This is the Javadoc for that method:
{noformat}
/**
 * Returns a set of configurations for Tasks based on the current configuration,
 * producing at most {@code maxTasks} configurations.
 *
 * @param maxTasks maximum number of configurations to generate
 * @return configurations for Tasks
 */
public abstract List> taskConfigs(int maxTasks);
{noformat}
This includes the constraint that the number of tasks is at most maxTasks, but 
this constraint is not enforced by the framework.

 

We should begin enforcing this constraint by dropping configs that exceed the 
limit, and logging a warning. For sink connectors this should harmlessly 
rebalance the consumer subscriptions onto the remaining tasks. For source 
connectors that distribute their work via task configs, this may result in an 
interruption in data transfer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]

2023-10-10 Thread via GitHub


plazma-prizma commented on code in PR #14491:
URL: https://github.com/apache/kafka/pull/14491#discussion_r1353388791


##
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java:
##
@@ -240,12 +242,19 @@ public static KafkaPrincipalBuilder 
createPrincipalBuilder(Map config

KerberosShortNamer kerberosShortNamer,

SslPrincipalMapper sslPrincipalMapper) {
 Class principalBuilderClass = (Class) 
configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
-final KafkaPrincipalBuilder builder;
+KafkaPrincipalBuilder builder;
 
 if (principalBuilderClass == null || principalBuilderClass == 
DefaultKafkaPrincipalBuilder.class) {
 builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, 
sslPrincipalMapper);
 } else if 
(KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
-builder = (KafkaPrincipalBuilder) 
Utils.newInstance(principalBuilderClass);
+try {
+Constructor constructor = 
principalBuilderClass.getConstructor(KerberosShortNamer.class, 
SslPrincipalMapper.class);
+builder = (KafkaPrincipalBuilder) 
constructor.newInstance(kerberosShortNamer, sslPrincipalMapper);

Review Comment:
   Thanks for the changes. LGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] More image replay test cases [kafka]

2023-10-10 Thread via GitHub


rondagostino commented on PR #14206:
URL: https://github.com/apache/kafka/pull/14206#issuecomment-1756151303

   @ahuang98 I still saw lots of test failures, and when I checked them locally 
I actually was able to reproduce the below failures.  Can you take a look to 
see if you also agree these fail locally and then, assuming you agree, 
investigate as to why?
   ```
   testTopicDualWriteDelta()
   testTopicDualWriteSnapshot()
   testControllerFailover()
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15571 / `StateRestoreListener#onRestoreSuspended` is never called because `DelegatingStateRestoreListener` doesn't implement `onRestoreSuspended` [kafka]

2023-10-10 Thread via GitHub


lkokhreidze commented on PR #14519:
URL: https://github.com/apache/kafka/pull/14519#issuecomment-1756120126

   Hi @cadonna,
   I've added an integration test. It seemed the safest way to test out the new 
functionality.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Replace GroupState with MembershipManager [kafka]

2023-10-10 Thread via GitHub


philipnee commented on code in PR #14390:
URL: https://github.com/apache/kafka/pull/14390#discussion_r1353232858


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -180,19 +180,19 @@ CompletableFuture sendAutoCommit(final 
Map offsets;
 private final String groupId;
-private final GroupState.Generation generation;
+private final int memberEpoch;

Review Comment:
   We could do it either way - recreate this RequestState object or resend the 
same object with the updated epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15548) Handling close() properly

2023-10-10 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15548:
---
Labels: consumer-threading-refactor kip-848 kip-848-client-support 
kip-848-e2e kip-848-preview  (was: consumer-threading-refactor kip-848 
kip-848-client-support kip-848-preview)

> Handling close() properly
> -
>
> Key: KAFKA-15548
> URL: https://issues.apache.org/jira/browse/KAFKA-15548
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-e2e, kip-848-preview
>
> Upon closing of the {{Consumer}} we need to:
>  # Complete pending commits
>  # Auto-commit if needed
>  # Send the last GroupConsumerHeartbeatRequest with epoch = -1 to leave the 
> group
>  # Close any fetch sessions on the brokers
>  # Poll the NetworkClient to complete pending I/O
> There is a mechanism introduced in PR 
> [14406|https://github.com/apache/kafka/pull/14406] that allows for performing 
> network I/O on shutdown. The new method 
> {{DefaultBackgroundThread.runAtClose()}} will be executed when 
> {{Consumer.close()}} is invoked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15534) Propagate client response time when timeout to the request handler

2023-10-10 Thread Yi Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yi Ding updated KAFKA-15534:

Labels: consumer-threading-refactor kip-848 kip-848-client-support 
kip-848-e2e kip-848-preview  (was: consumer-threading-refactor)

> Propagate client response time when timeout to the request handler
> --
>
> Key: KAFKA-15534
> URL: https://issues.apache.org/jira/browse/KAFKA-15534
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-e2e, kip-848-preview
>
> Currently, we don't have a good way to propagate the response time to the 
> handler when timeout is thrown.
> {code:java}
> unsent.handler.onFailure(new TimeoutException(
> "Failed to send request after " + unsent.timer.timeoutMs() + " ms.")); 
> {code}
> The current request manager invoke a system call to retrieve the response 
> time, which is not idea because it is already available at network client
> This is an example of the coordinator request manager:
> {code:java}
> unsentRequest.future().whenComplete((clientResponse, throwable) -> {
> long responseTimeMs = time.milliseconds();
> if (clientResponse != null) {
> FindCoordinatorResponse response = (FindCoordinatorResponse) 
> clientResponse.responseBody();
> onResponse(responseTimeMs, response);
> } else {
> onFailedResponse(responseTimeMs, throwable);
> }
> }); {code}
> But in the networkClientDelegate, we should utilize the currentTimeMs in the 
> trySend to avoid calling time.milliseconds():
> {code:java}
> private void trySend(final long currentTimeMs) {
> ...
> unsent.handler.onFailure(new TimeoutException(
> "Failed to send request after " + unsent.timer.timeoutMs() + " ms."));
> continue;
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15548) Handling close() properly

2023-10-10 Thread Yi Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yi Ding updated KAFKA-15548:

Labels: consumer-threading-refactor kip-848 kip-848-client-support 
kip-848-preview  (was: consumer-threading-refactor kip-)

> Handling close() properly
> -
>
> Key: KAFKA-15548
> URL: https://issues.apache.org/jira/browse/KAFKA-15548
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-preview
>
> Upon closing of the {{Consumer}} we need to:
>  # Complete pending commits
>  # Auto-commit if needed
>  # Send the last GroupConsumerHeartbeatRequest with epoch = -1 to leave the 
> group
>  # Close any fetch sessions on the brokers
>  # Poll the NetworkClient to complete pending I/O
> There is a mechanism introduced in PR 
> [14406|https://github.com/apache/kafka/pull/14406] that allows for performing 
> network I/O on shutdown. The new method 
> {{DefaultBackgroundThread.runAtClose()}} will be executed when 
> {{Consumer.close()}} is invoked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15548) Handling close() properly

2023-10-10 Thread Yi Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yi Ding updated KAFKA-15548:

Labels: consumer-threading-refactor kip-  (was: consumer-threading-refactor)

> Handling close() properly
> -
>
> Key: KAFKA-15548
> URL: https://issues.apache.org/jira/browse/KAFKA-15548
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-
>
> Upon closing of the {{Consumer}} we need to:
>  # Complete pending commits
>  # Auto-commit if needed
>  # Send the last GroupConsumerHeartbeatRequest with epoch = -1 to leave the 
> group
>  # Close any fetch sessions on the brokers
>  # Poll the NetworkClient to complete pending I/O
> There is a mechanism introduced in PR 
> [14406|https://github.com/apache/kafka/pull/14406] that allows for performing 
> network I/O on shutdown. The new method 
> {{DefaultBackgroundThread.runAtClose()}} will be executed when 
> {{Consumer.close()}} is invoked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Replace GroupState with MembershipManager [kafka]

2023-10-10 Thread via GitHub


dajac commented on code in PR #14390:
URL: https://github.com/apache/kafka/pull/14390#discussion_r1353179173


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -180,19 +180,19 @@ CompletableFuture sendAutoCommit(final 
Map offsets;
 private final String groupId;
-private final GroupState.Generation generation;
+private final int memberEpoch;

Review Comment:
   > Stale member epcoh error isn't retriable
   
   A stale member epoch error is retriable... but it should be retried with the 
new epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-10-10 Thread via GitHub


lihaosky commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1353117588


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+// push four items with increasing timestamps to the primary 
stream; this should emit null-joined items
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 
3:B3 (ts: 1003) }
+// w2 = {}
+final long time = 1000L;
+for (int i = 0; i < expectedKeys.length; i++) {
+inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], 
time + i);
+}
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "B0+null", 1000L),
+new KeyValueTimestamp<>(1, "B1+null", 1001L),
+new KeyValueTimestamp<>(2, "B2+null", 1002L)
+);
+}

Review Comment:
   Noob question: why do we have output here? The time difference is `100ms`, 
should we only output these three if we got an event with time `1103`? Maybe 
I'm missing something



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockAp

Re: [PR] MINOR - KAFKA-15550: Validation for negative target times in offsetsForTimes [kafka]

2023-10-10 Thread via GitHub


lianetm commented on PR #14503:
URL: https://github.com/apache/kafka/pull/14503#issuecomment-1756011144

   hey @cadonna, @lucasbru. This is a very small PR adding a missing API 
validation to the new consumer `offsetsForTimes` functionality.  
   It also includes some improved comments for the `updateFetchPositions`. It 
could all be useful to take a look at the new code if you have some time. 
Thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14879: Update system tests to use latest versions [kafka]

2023-10-10 Thread via GitHub


jolshan commented on PR #13528:
URL: https://github.com/apache/kafka/pull/13528#issuecomment-1755954984

   Sorry I must have missed this ping. :( My bad.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14879: Update system tests to use latest versions [kafka]

2023-10-10 Thread via GitHub


kirktrue commented on PR #13528:
URL: https://github.com/apache/kafka/pull/13528#issuecomment-1755953000

   This appears to be taken up by someone else, but they're doing it in a more 
automated/dynamic manner, which is better anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Backport kafka 15415 [kafka]

2023-10-10 Thread via GitHub


msn-tldr closed pull request #14521: Backport kafka 15415
URL: https://github.com/apache/kafka/pull/14521


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader (#… [kafka]

2023-10-10 Thread via GitHub


msn-tldr opened a new pull request, #14522:
URL: https://github.com/apache/kafka/pull/14522

   …14384)
   
   This PR backports https://github.com/apache/kafka/pull/14384
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14879: Update system tests to use latest versions [kafka]

2023-10-10 Thread via GitHub


kirktrue closed pull request #13528: KAFKA-14879: Update system tests to use 
latest versions
URL: https://github.com/apache/kafka/pull/13528


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Backport kafka 15415 [kafka]

2023-10-10 Thread via GitHub


msn-tldr opened a new pull request, #14521:
URL: https://github.com/apache/kafka/pull/14521

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353056783


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.

Review Comment:
   Is this method only called after the groupId is verified to exist?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353055183


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.

Review Comment:
   nit: the group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353053689


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import java.util.function.Function;
+
+public class OffsetExpirationConditionImpl implements 
OffsetExpirationCondition {
+
+/**
+ * Given an offset and metadata, obtain the base timestamp that should be 
used
+ * as the start of the offsets retention period.
+ */
+private final Function baseTimestamp;
+
+public OffsetExpirationConditionImpl(Function 
baseTimestamp) {
+this.baseTimestamp = baseTimestamp;
+}
+
+/**
+ * Determine whether an offset is expired. Older versions have an expire 
timestamp per partition. If this
+ * exists, compare against the current timestamp. Otherwise, use the base 
timestamp (either commit timestamp
+ * or current state timestamp if group is empty for generic groups) and 
check whether the offset has
+ * exceeded the offset retention.
+ *
+ * @param offset  The offset and metadata.
+ * @param currentTimestampMs  The current timestamp.
+ * @param offsetsRetentionMs  The offsets retention in milliseconds.
+ *
+ * @return Whether the given offset is expired or not.
+ */
+@Override
+public boolean isOffsetExpired(OffsetAndMetadata offset, long 
currentTimestampMs, long offsetsRetentionMs) {
+if (offset.expireTimestampMs.isPresent()) {
+// Older versions with explicit expire_timestamp field => old 
expiration semantics is used
+return currentTimestampMs >= offset.expireTimestampMs.getAsLong();
+} else {
+// Current version with no per partition retention

Review Comment:
   nit: missing period



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353048258


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -417,6 +441,39 @@ public CoordinatorResult 
deleteOffsets(
 return offsetMetadataManager.deleteOffsets(request);
 }
 
+/**
+ * For each group, remove all expired offsets. If all offsets for the 
group is removed and the group is eligible

Review Comment:
   nit: offsets for the group are* removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353045452


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -91,6 +91,27 @@ public class GroupCoordinatorConfig {
  */
 public final int genericGroupMaxSessionTimeoutMs;
 
+/**
+ * Frequency at which to check for expired offsets.
+ */
+public final long offsetsRetentionCheckIntervalMs;
+
+/**
+ * For subscribed consumers, committed offset of a specific partition will 
be expired and discarded when
+ * 1) this retention period has elapsed after the consumer group loses 
all its consumers (i.e. becomes empty);
+ * 2) this retention period has elapsed since the last time an offset 
is committed for the partition AND
+ *the group is no longer subscribed to the corresponding topic.
+ *
+ * For standalone consumers (using manual assignment), offsets will be 
expired after this retention period has
+ * elapsed since the time of last commit.
+ *
+ * Note that when a group is deleted via the DeleteGroups request, its 
committed offsets will also be deleted immediately;
+ *
+ * Also, when a topic is deleted via the delete-topic request, upon 
propagated metadata update any group's
+ * committed offsets for that topic will also be deleted without extra 
retention period

Review Comment:
   nit: missing period



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -91,6 +91,27 @@ public class GroupCoordinatorConfig {
  */
 public final int genericGroupMaxSessionTimeoutMs;
 
+/**
+ * Frequency at which to check for expired offsets.
+ */
+public final long offsetsRetentionCheckIntervalMs;
+
+/**
+ * For subscribed consumers, committed offset of a specific partition will 
be expired and discarded when
+ * 1) this retention period has elapsed after the consumer group loses 
all its consumers (i.e. becomes empty);
+ * 2) this retention period has elapsed since the last time an offset 
is committed for the partition AND
+ *the group is no longer subscribed to the corresponding topic.
+ *
+ * For standalone consumers (using manual assignment), offsets will be 
expired after this retention period has
+ * elapsed since the time of last commit.
+ *
+ * Note that when a group is deleted via the DeleteGroups request, its 
committed offsets will also be deleted immediately;
+ *
+ * Also, when a topic is deleted via the delete-topic request, upon 
propagated metadata update any group's

Review Comment:
   nit: missing period



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353045293


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -117,4 +119,16 @@ void validateOffsetFetch(
  * @param records The list of records.
  */
 void createGroupTombstoneRecords(List records);
+
+/**
+ * @return Whether the group can be deleted or not.
+ */
+boolean isEmpty();

Review Comment:
   We should fix the javadoc. Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353045205


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -91,6 +91,27 @@ public class GroupCoordinatorConfig {
  */
 public final int genericGroupMaxSessionTimeoutMs;
 
+/**
+ * Frequency at which to check for expired offsets.
+ */
+public final long offsetsRetentionCheckIntervalMs;
+
+/**
+ * For subscribed consumers, committed offset of a specific partition will 
be expired and discarded when
+ * 1) this retention period has elapsed after the consumer group loses 
all its consumers (i.e. becomes empty);
+ * 2) this retention period has elapsed since the last time an offset 
is committed for the partition AND
+ *the group is no longer subscribed to the corresponding topic.
+ *
+ * For standalone consumers (using manual assignment), offsets will be 
expired after this retention period has
+ * elapsed since the time of last commit.
+ *
+ * Note that when a group is deleted via the DeleteGroups request, its 
committed offsets will also be deleted immediately;
+ *
+ * Also, when a topic is deleted via the delete-topic request, upon 
propagated metadata update any group's

Review Comment:
   nit: missing period



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353044716


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -91,6 +91,27 @@ public class GroupCoordinatorConfig {
  */
 public final int genericGroupMaxSessionTimeoutMs;
 
+/**
+ * Frequency at which to check for expired offsets.
+ */
+public final long offsetsRetentionCheckIntervalMs;
+
+/**
+ * For subscribed consumers, committed offset of a specific partition will 
be expired and discarded when

Review Comment:
   nit: can we put a colon after the when and capitalize the T in this for the 
bullet points?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353043290


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -117,4 +119,16 @@ void validateOffsetFetch(
  * @param records The list of records.
  */
 void createGroupTombstoneRecords(List records);
+
+/**
+ * @return Whether the group can be deleted or not.
+ */
+boolean isEmpty();

Review Comment:
   the return value seems more like a use case right? Should we update the name 
of the method or the return statement and add the "whether group can be deleted 
or not" part as a use case in the javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-10 Thread via GitHub


iit2009060 commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1353033902


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -540,6 +545,8 @@ class RemoteIndexCacheTest {
   "Failed to mark cache entry for cleanup after resizing cache.")
 TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
   "Failed to cleanup cache entry after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanFinished,
+  "Failed to finish cleanup cache entry after resizing cache.")
 
 // verify no index files on remote cache dir
 TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,

Review Comment:
   @showuon  I have removed the `isCleanFinished` flag changes and catch the 
`Exception` in the method.
   cc @hudeqi 
   Please review it .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-10 Thread via GitHub


iit2009060 commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1353033902


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -540,6 +545,8 @@ class RemoteIndexCacheTest {
   "Failed to mark cache entry for cleanup after resizing cache.")
 TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
   "Failed to cleanup cache entry after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanFinished,
+  "Failed to finish cleanup cache entry after resizing cache.")
 
 // verify no index files on remote cache dir
 TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,

Review Comment:
   @showuon  I have removed the `isCleanFinished` flag and catch the 
`Exception` in the method.
   Please review it .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15574) Integrate partition assignment reconciliation with heartbeat request manager

2023-10-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15574:
---
Description: 
This task is to call the partition assignment reconciler from the heartbeat 
request manager, making sure to correctly query the state machine for the right 
actions.

 
The HB-reconciler interaction is 2 folded:
 * HB should send HB req when the reconciler completes callbacks
 * HB manager needs to trigger the reconciler to release assignments when 
errors occur.

All driven by the HB manager.

  was:
This task is to call the partition assignment reconciler from the heartbeat 
request manager, making sure to correctly query the state machine for the right 
actions.

 
The HB-reconciler interaction is 2 folded:
 * HB should send HB req when the reconciler completes callbacks
 * HB manager needs to trigger the reconciler to release assignments when 
errors occurs.

All driven by the HB manager.


> Integrate partition assignment reconciliation with heartbeat request manager
> 
>
> Key: KAFKA-15574
> URL: https://issues.apache.org/jira/browse/KAFKA-15574
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> This task is to call the partition assignment reconciler from the heartbeat 
> request manager, making sure to correctly query the state machine for the 
> right actions.
>  
> The HB-reconciler interaction is 2 folded:
>  * HB should send HB req when the reconciler completes callbacks
>  * HB manager needs to trigger the reconciler to release assignments when 
> errors occur.
> All driven by the HB manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15574) Integrate partition assignment reconciliation with heartbeat request manager

2023-10-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15574:
---
Description: 
This task is to call the partition assignment reconciler from the heartbeat 
request manager, making sure to correctly query the state machine for the right 
actions.

 
The HB-reconciler interaction is 2 folded:
 * HB should send HB req when the reconciler completes callbacks
 * HB manager needs to trigger the reconciler to release assignments when 
errors occurs.

All driven by the HB manager.

  was:
This task is to call the partition assignment reconciler from the heartbeat 
request manager, making sure to correctly query the state machine for the right 
actions.

 
The HB-reconciler interaction is 2 folded: *  HB should send HB req when the 
reconciler completes callbacks
 * HB manager needs to trigger the reconciler to release assignments when 
errors occurs.

All driven by the HB manager.


> Integrate partition assignment reconciliation with heartbeat request manager
> 
>
> Key: KAFKA-15574
> URL: https://issues.apache.org/jira/browse/KAFKA-15574
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> This task is to call the partition assignment reconciler from the heartbeat 
> request manager, making sure to correctly query the state machine for the 
> right actions.
>  
> The HB-reconciler interaction is 2 folded:
>  * HB should send HB req when the reconciler completes callbacks
>  * HB manager needs to trigger the reconciler to release assignments when 
> errors occurs.
> All driven by the HB manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15574) Integrate partition assignment reconciliation with heartbeat request manager

2023-10-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15574:
---
Description: 
This task is to call the partition assignment reconciler from the heartbeat 
request manager, making sure to correctly query the state machine for the right 
actions.

 
The HB-reconciler interaction is 2 folded: *  HB should send HB req when the 
reconciler completes callbacks
 * HB manager needs to trigger the reconciler to release assignments when 
errors occurs.

All driven by the HB manager.

  was:This task is to call the partition assignment reconciler from the 
heartbeat request manager, making sure to correctly query the state machine for 
the right actions.


> Integrate partition assignment reconciliation with heartbeat request manager
> 
>
> Key: KAFKA-15574
> URL: https://issues.apache.org/jira/browse/KAFKA-15574
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> This task is to call the partition assignment reconciler from the heartbeat 
> request manager, making sure to correctly query the state machine for the 
> right actions.
>  
> The HB-reconciler interaction is 2 folded: *  HB should send HB req when the 
> reconciler completes callbacks
>  * HB manager needs to trigger the reconciler to release assignments when 
> errors occurs.
> All driven by the HB manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-10 Thread via GitHub


jeffkbkim commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1755898146

   not sure what this error is from
   ```
   
   > Task :streams:compileJava
   
   
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14417/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java:246:
 error: incompatible types: inference variable V#1 has incompatible equality 
constraints KeyValueStore,VAgg
   
   return aggregate(initializer, adder, subtractor, 
Materialized.with(keySerde, null));
   
   ^
   
 where V#1,K#1,S,VAgg,K#2,V#2 are type-variables:
   
   V#1 extends Object declared in method 
with(Serde,Serde)
   
   K#1 extends Object declared in method 
with(Serde,Serde)
   
   S extends StateStore declared in method 
with(Serde,Serde)
   
   VAgg extends Object declared in method 
aggregate(Initializer,Aggregator,Aggregator)
   
   K#2 extends Object declared in class KGroupedTableImpl
   
   V#2 extends Object declared in class KGroupedTableImpl
   
   1 error
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


philipnee commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1353024844


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
 return state.equals(MemberState.STABLE);
 }
 
+/**
+ * Take new target assignment received from the server and set it as 
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send 
a new target
+ * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
 private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
 if (!targetAssignment.isPresent()) {
+log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
 targetAssignment = Optional.of(newTargetAssignment);
 } else {
-// Keep the latest next target assignment
-nextTargetAssignment = Optional.of(newTargetAssignment);
+transitionToFailed();
+throw new IllegalStateException("A target assignment pending to be 
reconciled already" +
+" exists.");
 }
 }
 
-private boolean hasPendingTargetAssignment() {
-return targetAssignment.isPresent() || 
nextTargetAssignment.isPresent();
-}
-
-
-/**
- * Update state and assignment as the member has successfully processed a 
new target
- * assignment.
- * This indicates the end of the reconciliation phase for the member, and 
makes the target
- * assignment the new current assignment.
- *
- * @param assignment Target assignment the member was able to successfully 
process
- */
-public void 
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-updateAssignment(assignment);
-transitionTo(MemberState.STABLE);
-}
-
 /**
- * Update state and member info as the member was not able to process the 
assignment, due to
- * errors in the execution of the user-provided callbacks.
- *
- * @param error Exception found during the execution of the user-provided 
callbacks
+ * Returns true if the member has a target assignment being processed.
  */
-public void onAssignmentProcessFailure(Throwable error) {
-transitionTo(MemberState.FAILED);
-// TODO: update member info appropriately, to clear up whatever 
shouldn't be kept in
-//  this unrecoverable state
+private boolean hasPendingTargetAssignment() {
+return targetAssignment.isPresent();
 }
 
 private void resetEpoch() {
 this.memberEpoch = 0;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public MemberState state() {
 return state;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public AssignorSelection assignorSelection() {
 return this.assignorSelection;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
 return this.currentAssignment;
 }
 
+
+/**
+ * Assignment that the member received from the server but hasn't 
completely processed yet.
+ */
 // VisibleForTesting
 Optional targetAssignment() 
{
 return targetAssignment;
 }
 
-// VisibleForTesting
-Optional 
nextTargetAssignment() {
-return nextTargetAssignment;
-}
-
 /**
- * Set the current assignment for the member. This indicates that the 
reconciliation of the
- * target assignment has been successfully completed.
- * This will clear the {@link #targetAssignment}, and take on the
- * {@link #nextTargetAssignment} if any.
+ * This indicates that the reconciliation of the target assignment has 
been successfully

Review Comment:
   instead of "this indicates" can we say: "this is invoked when is 
successfully completed"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


philipnee commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1353015059


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
 return state.equals(MemberState.STABLE);
 }
 
+/**
+ * Take new target assignment received from the server and set it as 
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send 
a new target
+ * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
 private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
 if (!targetAssignment.isPresent()) {
+log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
 targetAssignment = Optional.of(newTargetAssignment);
 } else {
-// Keep the latest next target assignment
-nextTargetAssignment = Optional.of(newTargetAssignment);
+transitionToFailed();
+throw new IllegalStateException("A target assignment pending to be 
reconciled already" +

Review Comment:
   Can we say: "Unable to set target assignment because ... " the help user 
understanding the cause?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]

2023-10-10 Thread via GitHub


rbaddam commented on PR #14504:
URL: https://github.com/apache/kafka/pull/14504#issuecomment-1755866839

   To address your concern about the specificity of the solution, I will create 
Interfaces/subclasses of KerberosPrincipalBuilder and SSLPrincipalBuilder to 
handle different types of principals and their configurations.
   
   
[https://github.com/apache/kafka/pull/14491/files](https://github.com/apache/kafka/pull/14491/files)
   
   We can apply the finalized approach to all the versions
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15574) Integrate partition assignment reconciliation with heartbeat request manager

2023-10-10 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15574:
--
Description: This task is to call the partition assignment reconciler from 
the heartbeat request manager, making sure to correctly query the state machine 
for the right actions.  (was: When the group member's assignment changes and 
partitions are revoked and auto-commit is enabled, we need to ensure that the 
commit request manager is invoked to queue up the commits.)

> Integrate partition assignment reconciliation with heartbeat request manager
> 
>
> Key: KAFKA-15574
> URL: https://issues.apache.org/jira/browse/KAFKA-15574
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> This task is to call the partition assignment reconciler from the heartbeat 
> request manager, making sure to correctly query the state machine for the 
> right actions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15574) Integrate partition assignment reconciliation with heartbeat request manager

2023-10-10 Thread Kirk True (Jira)
Kirk True created KAFKA-15574:
-

 Summary: Integrate partition assignment reconciliation with 
heartbeat request manager
 Key: KAFKA-15574
 URL: https://issues.apache.org/jira/browse/KAFKA-15574
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True


When the group member's assignment changes and partitions are revoked and 
auto-commit is enabled, we need to ensure that the commit request manager is 
invoked to queue up the commits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15573) Implement auto-commit on partition assignment revocation

2023-10-10 Thread Kirk True (Jira)
Kirk True created KAFKA-15573:
-

 Summary: Implement auto-commit on partition assignment revocation
 Key: KAFKA-15573
 URL: https://issues.apache.org/jira/browse/KAFKA-15573
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True


Provide the Java client support for the consumer group partition assignment 
logic, including:
 * Calculate the difference between the current partition assignment and that 
returned in the {{ConsumerGroupHeartbeatResponse}} RPC response
 * Ensure we handle the case where changes to the assignment take multiple 
passes of {{RequestManager.poll()}}
 * Integrate the mechanism to invoke the user’s rebalance callback

This task is part of the work to implement support for the new KIP-848 consumer 
group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15573) Implement auto-commit on partition assignment revocation

2023-10-10 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15573:
--
Description: When the group member's assignment changes and partitions are 
revoked and auto-commit is enabled, we need to ensure that the commit request 
manager is invoked to queue up the commits.  (was: Provide the Java client 
support for the consumer group partition assignment logic, including:
 * Calculate the difference between the current partition assignment and that 
returned in the {{ConsumerGroupHeartbeatResponse}} RPC response
 * Ensure we handle the case where changes to the assignment take multiple 
passes of {{RequestManager.poll()}}
 * Integrate the mechanism to invoke the user’s rebalance callback

This task is part of the work to implement support for the new KIP-848 consumer 
group protocol.)

> Implement auto-commit on partition assignment revocation
> 
>
> Key: KAFKA-15573
> URL: https://issues.apache.org/jira/browse/KAFKA-15573
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> When the group member's assignment changes and partitions are revoked and 
> auto-commit is enabled, we need to ensure that the commit request manager is 
> invoked to queue up the commits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


philipnee commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1352882295


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -21,53 +21,76 @@
 import java.util.Optional;
 
 /**
- * Manages group membership for a single member.
+ * Manages membership of a single member to a consumer group.
+ * 
  * Responsible for:
  * Keeping member state
  * Keeping assignment for the member
  * Computing assignment for the group if the member is required to do 
so
  */
 public interface MembershipManager {
 
+/**
+ * ID of the consumer group the member is part of (or wants to be part of).
+ */
 String groupId();
 
+/**
+ * Instance ID used by the member when joining the group. If non-empty, it 
will indicate that
+ * this is a static member.
+ */
 Optional groupInstanceId();
 
+/**
+ * Member ID assigned by the server to this member when it joins the 
consumer group.
+ */
 String memberId();
 
+/**
+ * Current epoch of the member, maintained by the server.
+ */
 int memberEpoch();
 
+/**
+ * Current state of this member a part of the consumer group, as defined 
in {@link MemberState}.

Review Comment:
   Maybe "the current state of the consumer" because it might not be in a group 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-10 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1352879009


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java:
##
@@ -16,13 +16,48 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 
+import static 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;
+
 /**
  * {@code PollResult} consist of {@code UnsentRequest} if there are requests 
to send; otherwise, return the time till
  * the next poll event.
  */
 public interface RequestManager {
+
+/**
+ * During normal operation of the {@link Consumer}, a request manager may 
need to send out network requests.
+ * Implementations can return {@link PollResult their need for network 
I/O} by returning the requests here.
+ * Because the {@code poll} method is called within the single-threaded 
context of the consumer's main network
+ * I/O thread, there should be no need for synchronization protection 
within itself or other state.
+ *
+ * 
+ *
+ * Note: no network I/O occurs in this method. The method itself 
should not block on I/O or for any
+ * other reason. This method is called from by the consumer's main network 
I/O thread. So quick execution of

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-10 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1352876631


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.internals.IdempotentCloser;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+
+/**
+ * Background thread runnable that consumes {@link ApplicationEvent} and 
produces {@link BackgroundEvent}. It
+ * uses an event loop to consume and produce events, and poll the network 
client to handle network IO.
+ */
+public class ConsumerNetworkThread extends KafkaThread implements Closeable {
+
+private static final long MAX_POLL_TIMEOUT_MS = 5000;
+private static final String BACKGROUND_THREAD_NAME = 
"consumer_background_thread";
+private final Time time;
+private final Logger log;
+private final Supplier 
applicationEventProcessorSupplier;
+private final Supplier 
networkClientDelegateSupplier;
+private final Supplier requestManagersSupplier;
+private ApplicationEventProcessor applicationEventProcessor;
+private NetworkClientDelegate networkClientDelegate;
+private RequestManagers requestManagers;
+private volatile boolean running;
+private final IdempotentCloser closer = new IdempotentCloser();
+
+public ConsumerNetworkThread(LogContext logContext,
+ Time time,
+ Supplier 
applicationEventProcessorSupplier,
+ Supplier 
networkClientDelegateSupplier,
+ Supplier 
requestManagersSupplier) {
+super(BACKGROUND_THREAD_NAME, true);
+this.time = time;
+this.log = logContext.logger(getClass());
+this.applicationEventProcessorSupplier = 
applicationEventProcessorSupplier;
+this.networkClientDelegateSupplier = networkClientDelegateSupplier;
+this.requestManagersSupplier = requestManagersSupplier;
+}
+
+@Override
+public void run() {
+closer.assertOpen("Consumer network thread is already closed");
+running = true;
+
+try {
+log.debug("Consumer network thread started");
+
+// Wait until we're securely in the background network thread to 
initialize these objects...
+initializeResources();
+
+while (running) {
+try {
+runOnce();
+} catch (final WakeupException e) {
+log.debug("WakeupException caught, consumer network thread 
won't be interrupted");
+// swallow the wakeup exception to prevent killing the 
thread.
+}
+}
+} catch (final Throwable t) {
+log.error("The consumer network thread failed due to unexpected 
error", t);
+throw new KafkaException(t);
+}
+}
+
+void initializeResources() {
+applicationEventProcessor = applicationEventProcessorSupplier.get();
+networkClientDelegate = networkClientDelegateSupplier.get();
+requestM

Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


philipnee commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1352876691


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -21,53 +21,76 @@
 import java.util.Optional;
 
 /**
- * Manages group membership for a single member.
+ * Manages membership of a single member to a consumer group.

Review Comment:
   Can we say something like "A stateful object tracking the state of the 
consumer including: "? Membership might mean a lot of different things for 
different people.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1352875373


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -19,56 +19,91 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * 
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * Keeping member info (ex. member id, member epoch, assignment, etc.)
+ * Keeping member state as defined in {@link MemberState}.
+ * 
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+/**
+ * ID of the consumer group the member will be part of., provided when 
creating the current
+ * membership manager.
+ */
 private final String groupId;
+
+/**
+ * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+ */
 private final Optional groupInstanceId;
+
+/**
+ * Member ID assigned by the server to the member, received in a heartbeat 
response when
+ * joining the group specified in {@link #groupId}
+ */
 private String memberId;
+
+/**
+ * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+ * incremented as the member reconciles and acknowledges the assignments 
it receives.
+ */
 private int memberEpoch;
+
+/**
+ * Current state of this member a part of the consumer group, as defined 
in {@link MemberState}
+ */
 private MemberState state;
+
+/**
+ * Assignor type selection for the member. If non-null, the member will 
send its selection to
+ * the server on the {@link ConsumerGroupHeartbeatRequest}. If null, the 
server will select a
+ * default assignor for the member, which the member does not need to 
track.
+ */
 private AssignorSelection assignorSelection;

Review Comment:
   As described in the field doc, this changed to always having a selection, 
that will default to using server-side assignor, and let the server choose the 
specific implementation to use. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-10 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1352874538


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+
+public class BackgroundEventProcessor {
+
+private final Logger log;
+private final BlockingQueue backgroundEventQueue;
+
+public BackgroundEventProcessor(final LogContext logContext,
+final BlockingQueue 
backgroundEventQueue) {
+this.log = logContext.logger(BackgroundEventProcessor.class);
+this.backgroundEventQueue = backgroundEventQueue;
+}
+
+/**
+ * Drains all available {@link BackgroundEvent}s, and then processes them 
in order. If any
+ * errors are thrown as a result of a {@link ErrorBackgroundEvent} or an 
error occurs while processing
+ * another type of {@link BackgroundEvent}, only the first 
exception will be thrown, all
+ * subsequent errors will simply be logged at WARN level.
+ *
+ * @throws RuntimeException or subclass
+ */
+public void process() {
+LinkedList events = new LinkedList<>();
+backgroundEventQueue.drainTo(events);
+
+RuntimeException first = null;
+int errorCount = 0;
+
+for (BackgroundEvent event : events) {
+log.debug("Consuming background event: {}", event);
+
+try {
+process(event);
+} catch (RuntimeException e) {
+errorCount++;
+
+if (first == null) {
+first = e;
+log.warn("Error #{} from background thread (will be logged 
and thrown): {}", errorCount, e.getMessage(), e);

Review Comment:
   Removed check to avoid.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -16,197 +16,78 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.ApiVersions;
-import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.GroupRebalanceConfig;
-import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
-import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.internals.IdempotentCloser;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
+import java.io.Closeable;
+import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
-
-import static java.util.Objects.requireNonNull;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredIsolationLevel;
+import java.util.function.Supplier;
 
 /**
  * Background thread runnable that consumes {@code ApplicationEvent} and
  * produces {@code BackgroundEvent}. It uses an event loop to consume and
  * produce events, and poll the network client to handle network IO.
- * 
+ * 
  * It holds a ref

[PR] Update HEADER [kafka]

2023-10-10 Thread via GitHub


adikarthik opened a new pull request, #14520:
URL: https://github.com/apache/kafka/pull/14520

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]

2023-10-10 Thread via GitHub


CalvinConfluent commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1352848836


##
core/src/main/scala/kafka/cluster/Replica.scala:
##
@@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: 
TopicPartition) extends Log
* fetch request is always smaller than the leader's LEO, which can happen 
if small produce requests are received at
* high frequency.
*/
-  def updateFetchState(
+  def updateFetchStateOrThrow(
 followerFetchOffsetMetadata: LogOffsetMetadata,
 followerStartOffset: Long,
 followerFetchTimeMs: Long,
 leaderEndOffset: Long,
 brokerEpoch: Long
   ): Unit = {
 replicaState.updateAndGet { currentReplicaState =>
+  val cachedBrokerEpoch = if 
(metadataCache.isInstanceOf[KRaftMetadataCache])
+
metadataCache.asInstanceOf[KRaftMetadataCache].getAliveBrokerEpoch(brokerId) 
else Option(-1L)
+  // Fence the update if it provides a stale broker epoch.
+  if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {

Review Comment:
   Maybe allowing the updates with higher broker epochs has one problem. Is it 
possible there is a malfunctioning/bug broker to fetch with a much higher 
broker epoch, then the leader has to restart to get out of the state. Other 
than this, I don't see a problem to allow higher broker epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]

2023-10-10 Thread via GitHub


CalvinConfluent commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1352848836


##
core/src/main/scala/kafka/cluster/Replica.scala:
##
@@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: 
TopicPartition) extends Log
* fetch request is always smaller than the leader's LEO, which can happen 
if small produce requests are received at
* high frequency.
*/
-  def updateFetchState(
+  def updateFetchStateOrThrow(
 followerFetchOffsetMetadata: LogOffsetMetadata,
 followerStartOffset: Long,
 followerFetchTimeMs: Long,
 leaderEndOffset: Long,
 brokerEpoch: Long
   ): Unit = {
 replicaState.updateAndGet { currentReplicaState =>
+  val cachedBrokerEpoch = if 
(metadataCache.isInstanceOf[KRaftMetadataCache])
+
metadataCache.asInstanceOf[KRaftMetadataCache].getAliveBrokerEpoch(brokerId) 
else Option(-1L)
+  // Fence the update if it provides a stale broker epoch.
+  if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {

Review Comment:
   Maybe allowing the updates with higher broker epochs has one problem. Is it 
possible there is a malfunctioning/bug broker to fetch with a crazy broker 
epoch, then the leader has to restart to get out of the state. Other than this, 
I don't see a problem to allow higher broker epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1352815096


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,46 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+ */
+@Override
+public Optional offsetExpirationCondition() {
+if (protocolType.isPresent()) {
+if (isInState(EMPTY)) {
+// No consumer exists in the group =>
+// - If current state timestamp exists and retention period 
has passed since group became Empty,
+//   expire all offsets with no pending offset commit;
+// - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+//   since the last commit timestamp, expire the offset
+return Optional.of(new OffsetExpirationConditionImpl(
+offsetAndMetadata -> 
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs)));
+} else if (usesConsumerGroupProtocol() && 
subscribedTopics.isPresent() && isInState(STABLE)) {
+// Consumers exist in the group and group is Stable =>
+// - If the group is aware of the subscribed topics and 
retention period had passed since the
+//   last commit timestamp, expire the offset. offset with 
pending offset commit are not
+//   expired
+return Optional.of(new 
OffsetExpirationConditionImpl(offsetAndMetadata -> 
offsetAndMetadata.commitTimestampMs));

Review Comment:
   no, commitTimestampMs is a field and not a method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15355: Message schema changes [kafka]

2023-10-10 Thread via GitHub


clolov commented on PR #14290:
URL: https://github.com/apache/kafka/pull/14290#issuecomment-1755706472

   I am putting myself as a reviewer because I would like to keep up to date 
with these changes. I will aim to provide my review tomorrow!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


lianetm commented on PR #14413:
URL: https://github.com/apache/kafka/pull/14413#issuecomment-1755659746

   Hey @dajac , this ready for review now, including trunk latest changes. 
Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


lianetm opened a new pull request, #14413:
URL: https://github.com/apache/kafka/pull/14413

   This PR includes:
   
   - changes for error handling, leaving responsibility in the heartbeatManager 
and exposing only the functionality for when the state needs to be updated (on 
successful HB, on fencing, on fatal failure)
   - simplified assignment handling on the assumption that the members will 
handle one assignment at a time
   - allow transitions for failures when joining
   - remove default assignor logic (left to the server)
   - tests & minor fixes addressing initial version review
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


lianetm closed pull request #14413: KAFKA-14323: Client state changes for 
handling one assignment at a time & minor improvements
URL: https://github.com/apache/kafka/pull/14413


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15572) Race condition between future log dir roll and replace current with future log during alterReplicaLogDirs

2023-10-10 Thread Lucian Ilie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucian Ilie updated KAFKA-15572:

Affects Version/s: 3.0.0
   (was: 3.3.1)
   (was: 3.4.1)

> Race condition between future log dir roll and replace current with future 
> log during alterReplicaLogDirs
> -
>
> Key: KAFKA-15572
> URL: https://issues.apache.org/jira/browse/KAFKA-15572
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.0.0
>Reporter: Lucian Ilie
>Priority: Major
> Attachments: kafka-alter-log-dir-nosuchfileexception.log
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
> using banzaicloud/koperator.
> We have multiple disks per broker.
> We are using Cruise Control remove disk operation in order to aggregate 
> multiple smaller disks into a single bigger disk.
> When we do this, *the flush operation fails apparently randomly with 
> NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a 
> sample of logs for the exception and the previous operations taking place.
> Will further detail the cause of this issue.
> Say we have 3 brokers:
>  * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
> disk /new-kafka-logs1/kafka
>  * broker 201 with same disks
>  * broker 301 with same disks
> When Cruise Control executes a remove disk operation, it calls Kafka 
> "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment 
> as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to 
> /new-kafka-logs1/kafka.
> During the alter log dir operation, future logs are created (to move data 
> from e.g. "/kafka-logs1/kafka/topic-partition" to 
> "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and 
> finally the log dir will be renamed from 
> "/new-kafka-logs1/kafka/topic-partition.hash-future" to 
> "/new-kafka-logs1/kafka/topic-partition". This operation is started in 
> [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713]
>  and is locked using the [UnifiedLog 
> lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
>  The rename is then delegated to 
> [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113].
>  This is the 1st code part that is involved in the race condition.
> Meanwhile, log dirs can be rolled based on known conditions (e.g. getting 
> full), which will call 
> [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547],
>  which is locked using the [UnifiedLog 
> lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
>  However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is 
> not sharing that lock, since it is [done as a scheduled task in a separate 
> thread|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547].
>  This means that further operations are [not locked at UnifiedLog 
> level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
>  The operation is further delegated to 
> [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177],
>  which will also try to [flush the log 
> dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
>  This is the 2nd code part that is involved in the race condition.
> Since the log dir flush does not share the lock with the rename dir operation 
> (as it is scheduled via the scheduler), the rename dir operation might 
> succeed in moving the log dir on disk to "topic-partition", but the 
> LocalLog._dir will remain set to "topic-partition.hash-future", and when the 
> flush will attempt to flush the "topic-partition.hash-future" directory, it 
> will throw NoSuchFileException: "topic-partition.hash-future". Basically, 
> [this 
> line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
>  might succeed, and before [this other 
> line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
>  is executed, flush tries to [flush the future 
> dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
> We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved 
> the issue by synchronizing the flush dir operation. Will reply with a link to 
> a PR.
> Note that th

[jira] [Updated] (KAFKA-15572) Race condition between future log dir roll and replace current with future log during alterReplicaLogDirs

2023-10-10 Thread Lucian Ilie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucian Ilie updated KAFKA-15572:

Description: 
We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
using banzaicloud/koperator.

We have multiple disks per broker.
We are using Cruise Control remove disk operation in order to aggregate 
multiple smaller disks into a single bigger disk.

When we do this, *the flush operation fails apparently randomly with 
NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample 
of logs for the exception and the previous operations taking place.

Will further detail the cause of this issue.

Say we have 3 brokers:
 * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
disk /new-kafka-logs1/kafka
 * broker 201 with same disks
 * broker 301 with same disks

When Cruise Control executes a remove disk operation, it calls Kafka 
"adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as 
to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to 
/new-kafka-logs1/kafka.

During the alter log dir operation, future logs are created (to move data from 
e.g. "/kafka-logs1/kafka/topic-partition" to 
"/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and 
finally the log dir will be renamed from 
"/new-kafka-logs1/kafka/topic-partition.hash-future" to 
"/new-kafka-logs1/kafka/topic-partition". This operation is started in 
[UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713]
 and is locked using the [UnifiedLog 
lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 The rename is then delegated to 
[LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113].
 This is the 1st code part that is involved in the race condition.

Meanwhile, log dirs can be rolled based on known conditions (e.g. getting 
full), which will call 
[UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547],
 which is locked using the [UnifiedLog 
lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is not 
sharing that lock, since it is [done as a scheduled task in a separate 
thread|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547].
 This means that further operations are [not locked at UnifiedLog 
level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 The operation is further delegated to 
[LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177],
 which will also try to [flush the log 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
 This is the 2nd code part that is involved in the race condition.

Since the log dir flush does not share the lock with the rename dir operation 
(as it is scheduled via the scheduler), the rename dir operation might succeed 
in moving the log dir on disk to "topic-partition", but the LocalLog._dir will 
remain set to "topic-partition.hash-future", and when the flush will attempt to 
flush the "topic-partition.hash-future" directory, it will throw 
NoSuchFileException: "topic-partition.hash-future". Basically, [this 
line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
 might succeed, and before [this other 
line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
 is executed, flush tries to [flush the future 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].

We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved the 
issue by synchronizing the flush dir operation. Will reply with a link to a PR.

Note that this bug replicates for every version since 3.0.0, caused by [this 
commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341]
 when flush dir was added.

  was:
We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
using banzaicloud/koperator.

We have multiple disks per broker.
We are using Cruise Control remove disk operation in order to aggregate 
multiple smaller disks into a single bigger disk.

When we do this, *the flush operation fails apparently randomly with 
NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample 
of logs for the exception and the previous operations taking place.

Will further detail the cause of this issue.

Say we have 3 brokers:
 * broker 101 with 

[jira] [Updated] (KAFKA-15572) Race condition between future log dir roll and replace current with future log during alterReplicaLogDirs

2023-10-10 Thread Lucian Ilie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucian Ilie updated KAFKA-15572:

Attachment: kafka-alter-log-dir-nosuchfileexception.log

> Race condition between future log dir roll and replace current with future 
> log during alterReplicaLogDirs
> -
>
> Key: KAFKA-15572
> URL: https://issues.apache.org/jira/browse/KAFKA-15572
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.1, 3.4.1
>Reporter: Lucian Ilie
>Priority: Major
> Attachments: kafka-alter-log-dir-nosuchfileexception.log
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
> using banzaicloud/koperator.
> We have multiple disks per broker.
> We are using Cruise Control remove disk operation in order to aggregate 
> multiple smaller disks into a single bigger disk.
> When we do this, *the flush operation fails apparently randomly with 
> NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a 
> sample of logs for the exception and the previous operations taking place.
> Will further detail the cause of this issue.
> Say we have 3 brokers:
>  * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
> disk /new-kafka-logs1/kafka
>  * broker 201 with same disks
>  * broker 301 with same disks
> When Cruise Control executes a remove disk operation, it calls Kafka 
> "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment 
> as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to 
> /new-kafka-logs1/kafka.
> During the alter log dir operation, future logs are created (to move data 
> from e.g. "/kafka-logs1/kafka/topic-partition" to 
> "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and 
> finally the log dir will be renamed from 
> "/new-kafka-logs1/kafka/topic-partition.hash-future" to 
> "/new-kafka-logs1/kafka/topic-partition". This operation is started in 
> [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713]
>  and is locked using the [UnifiedLog 
> lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
>  The rename is then delegated to 
> [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113].
>  This is the 1st code part that is involved in the race condition.
> Meanwhile, log dirs can be rolled based on known conditions (e.g. getting 
> full), which will call 
> [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547],
>  which is locked using the [UnifiedLog 
> lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
>  However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is 
> not sharing that lock, since it is done as a scheduled task in a separate 
> thread. This means that further operations are [not locked at UnifiedLog 
> level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
>  The operation is further delegated to 
> [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177],
>  which will also try to [flush the log 
> dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
>  This is the 2nd code part that is involved in the race condition.
> Since the log dir flush does not share the lock with the rename dir operation 
> (as it is scheduled via the scheduler), the rename dir operation might 
> succeed in moving the log dir on disk to "topic-partition", but the 
> LocalLog._dir will remain set to "topic-partition.hash-future", and when the 
> flush will attempt to flush the "topic-partition.hash-future" directory, it 
> will throw NoSuchFileException: "topic-partition.hash-future". Basically, 
> [this 
> line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
>  might succeed, and before [this other 
> line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
>  is executed, flush tries to [flush the future 
> dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
> We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved 
> the issue by synchronizing the flush dir operation. Will reply with a link to 
> a PR.
> Note that this bug replicates for every version since 3.0.0, caused by [this 
> commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d

[jira] [Updated] (KAFKA-15572) Race condition between future log dir roll and replace current with future log during alterReplicaLogDirs

2023-10-10 Thread Lucian Ilie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucian Ilie updated KAFKA-15572:

Description: 
We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
using banzaicloud/koperator.

We have multiple disks per broker.
We are using Cruise Control remove disk operation in order to aggregate 
multiple smaller disks into a single bigger disk.

When we do this, *the flush operation fails apparently randomly with 
NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample 
of logs for the exception and the previous operations taking place.

Will further detail the cause of this issue.

Say we have 3 brokers:
 * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
disk /new-kafka-logs1/kafka
 * broker 201 with same disks
 * broker 301 with same disks

When Cruise Control executes a remove disk operation, it calls Kafka 
"adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as 
to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to 
/new-kafka-logs1/kafka.

During the alter log dir operation, future logs are created (to move data from 
e.g. "/kafka-logs1/kafka/topic-partition" to 
"/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and 
finally the log dir will be renamed from 
"/new-kafka-logs1/kafka/topic-partition.hash-future" to 
"/new-kafka-logs1/kafka/topic-partition". This operation is started in 
[UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713]
 and is locked using the [UnifiedLog 
lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 The rename is then delegated to 
[LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113].
 This is the 1st code part that is involved in the race condition.

Meanwhile, log dirs can be rolled based on known conditions (e.g. getting 
full), which will call 
[UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547],
 which is locked using the [UnifiedLog 
lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is not 
sharing that lock, since it is done as a scheduled task in a separate thread. 
This means that further operations are [not locked at UnifiedLog 
level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 The operation is further delegated to 
[LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177],
 which will also try to [flush the log 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
 This is the 2nd code part that is involved in the race condition.

Since the log dir flush does not share the lock with the rename dir operation 
(as it is scheduled via the scheduler), the rename dir operation might succeed 
in moving the log dir on disk to "topic-partition", but the LocalLog._dir will 
remain set to "topic-partition.hash-future", and when the flush will attempt to 
flush the "topic-partition.hash-future" directory, it will throw 
NoSuchFileException: "topic-partition.hash-future". Basically, [this 
line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
 might succeed, and before [this other 
line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
 is executed, flush tries to [flush the future 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].

We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved the 
issue by synchronizing the flush dir operation. Will reply with a link to a PR.

Note that this bug replicates for every version since 3.0.0, caused by [this 
commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341]
 when flush dir was added.

  was:
We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
using banzaicloud/koperator.

We have multiple disks per broker.
We are using Cruise Control remove disk operation in order to aggregate 
multiple smaller disks into a single bigger disk.

When we do this, *the flush operation fails apparently randomly with 
NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample 
of logs for the exception and the previous operations taking place.

Will further detail the cause of this issue.

Say we have 3 brokers:
 * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
disk /new-kafka-logs1/kafka
 * broker 

[jira] [Updated] (KAFKA-15572) Race condition between future log dir roll and replace current with future log during alterReplicaLogDirs

2023-10-10 Thread Lucian Ilie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucian Ilie updated KAFKA-15572:

Description: 
We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
using banzaicloud/koperator.

We have multiple disks per broker.
We are using Cruise Control remove disk operation in order to aggregate 
multiple smaller disks into a single bigger disk.

When we do this, *the flush operation fails apparently randomly with 
NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample 
of logs for the exception and the previous operations taking place.

Will further detail the cause of this issue.

Say we have 3 brokers:
 * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
disk /new-kafka-logs1/kafka
 * broker 201 with same disks
 * broker 301 with same disks

When Cruise Control executes a remove disk operation, it calls Kafka 
"adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as 
to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to 
/new-kafka-logs1/kafka.

During the alter log dir operation, future logs are created (to move data from 
e.g. "/kafka-logs1/kafka/topic-partition" to 
"/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and 
finally the log dir will be renamed from 
"/new-kafka-logs1/kafka/topic-partition.hash-future" to 
"/new-kafka-logs1/kafka/topic-partition". This operation is started in 
[UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713]
 and is locked using the [UnifiedLog 
lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 The rename is then delegated to 
[LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113].
 This is the 1st code part that is involved in the race condition.

Meanwhile, log dirs can be rolled based on known conditions (e.g. getting 
full), which will call 
[UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547],
 which is locked using the UnifiedLog lock. However, the further delegation to 
UnifiedLog.flushUptoOffsetExclusive is not sharing that lock, since it is done 
as a scheduled task in a separate thread. This means that further operations 
are [not locked at UnifiedLog 
level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 The operation is further delegated to 
[LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177],
 which will also try to [flush the log 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
 This is the 2nd code part that is involved in the race condition.

Since the log dir flush does not share the lock with the rename dir operation, 
the rename dir operation might succeed in moving the log dir on disk to 
"topic-partition", but the LocalLog._dir will remain set to 
"topic-partition.hash-future", and when the flush will attempt to flush the 
"topic-partition.hash-future" directory, it will throw NoSuchFileException: 
"topic-partition.hash-future". Basically, [this 
line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
 might succeed, and before [this other 
line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
 is executed, flush tries to [flush the future 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].

We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved the 
issue by synchronizing the flush dir operation. Will reply with a link to a PR.

Note that this bug replicates for every version since 3.0.0, caused by [this 
commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341]
 when flush dir was added.

  was:
We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
using banzaicloud/koperator.

We have multiple disks per broker.
We are using Cruise Control remove disk operation in order to aggregate 
multiple smaller disks into a single bigger disk.

When we do this, *the flush operation fails apparently randomly with 
NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample 
of logs for the exception and the previous operations taking place.

Will further detail the cause of this issue.

Say we have 3 brokers:
 * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
disk /new-kafka-logs1/kafka
 * broker 201 with same disks
 * broker 301 with same disks

When Cruise Control executes a remove disk operation, it calls Kafka 
"adminClient.a

  1   2   >