[jira] [Assigned] (KAFKA-13803) Refactor Leader API Access
[ https://issues.apache.org/jira/browse/KAFKA-13803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam reassigned KAFKA-13803: Assignee: Rittika Adhikari > Refactor Leader API Access > -- > > Key: KAFKA-13803 > URL: https://issues.apache.org/jira/browse/KAFKA-13803 > Project: Kafka > Issue Type: Improvement >Reporter: Rittika Adhikari >Assignee: Rittika Adhikari >Priority: Major > > Currently, AbstractFetcherThread has a series of protected APIs which control > access to the Leader. ReplicaFetcherThread and ReplicaAlterLogDirsThread > respectively override these protected APIs and handle access to the Leader in > a remote and local object store context. > We propose to move these protected APIs to a LeaderEndPoint interface, which > will serve all fetches from the Leader. We will implement a > RemoteLeaderEndPoint and a LocalLeaderEndPoint accordingly. This change will > greatly simplify our existing follower fetch code. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13603) empty active segment can trigger recovery after clean shutdown and restart
[ https://issues.apache.org/jira/browse/KAFKA-13603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-13603: - Issue Type: Improvement (was: Bug) > empty active segment can trigger recovery after clean shutdown and restart > -- > > Key: KAFKA-13603 > URL: https://issues.apache.org/jira/browse/KAFKA-13603 > Project: Kafka > Issue Type: Improvement >Reporter: Cong Ding >Assignee: Cong Ding >Priority: Major > Fix For: 3.2.0 > > > Within a LogSegment, the TimeIndex and OffsetIndex are lazy indices that > don't get created on disk until they are accessed for the first time. If the > active segment is empty at the time of the clean shutdown, the disk will have > only the log file but no index files. > However, Log recovery logic expects the presence of an offset index file on > disk for each segment, otherwise, the segment is considered corrupted. > We need to address this issue: create the index files for empty active > segments during clean shutdown. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13603) empty active segment can trigger recovery after clean shutdown and restart
[ https://issues.apache.org/jira/browse/KAFKA-13603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-13603: - Priority: Major (was: Minor) > empty active segment can trigger recovery after clean shutdown and restart > -- > > Key: KAFKA-13603 > URL: https://issues.apache.org/jira/browse/KAFKA-13603 > Project: Kafka > Issue Type: Bug >Reporter: Cong Ding >Assignee: Cong Ding >Priority: Major > Fix For: 3.2.0 > > > Within a LogSegment, the TimeIndex and OffsetIndex are lazy indices that > don't get created on disk until they are accessed for the first time. If the > active segment is empty at the time of the clean shutdown, the disk will have > only the log file but no index files. > However, Log recovery logic expects the presence of an offset index file on > disk for each segment, otherwise, the segment is considered corrupted. > We need to address this issue: create the index files for empty active > segments during clean shutdown. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13701) Pin background worker threads for certain background work (ex: UnifiedLog.flush())
[ https://issues.apache.org/jira/browse/KAFKA-13701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-13701: - Description: Certain background work such as UnifiedLog.flush() need not support concurrency. Today in the existing KafkaScheduler, we are not able to pin background work to specific threads. As a result we are unable to prevent concurrent UnifiedLog.flush() calls, so we have to ensure UnifiedLog.flush() implementation is thread safe by modifying the code at subtle areas (ex: [PR #11814|https://github.com/apache/kafka/pull/11814]). The code will be simpler if instead KafkaScheduler (or alike) could support pinning of certain background work to specific threads, for example the UnifiedLog.flush() operation for the same topic-partition will go to the same thread. This will ensure strict ordering of flush() calls, thereby enabling us to write simpler code eventually. Subsequently, in this Jira we will modify the code to schedule UnifiedLog.flush() only to specific background threads always (based on topic-partition). was:Certain background work such as UnifiedLog.flush() need not support concurrency. Today in the existing KafkaScheduler, we are not able to pin background work to specific threads. As a result we are unable to prevent concurrent UnifiedLog.flush() calls, so we have to ensure UnifiedLog.flush() implementation is thread safe by modifying the code at subtle areas (ex: [PR #11814|https://github.com/apache/kafka/pull/11814]). The code will be simpler if instead KafkaScheduler (or alike) could support pinning of certain background work to specific threads, for example the UnifiedLog.flush() operation for the same topic-partition will go to the same thread. This will ensure strict ordering of flush() calls, thereby enabling us to write simpler code eventually. > Pin background worker threads for certain background work (ex: > UnifiedLog.flush()) > -- > > Key: KAFKA-13701 > URL: https://issues.apache.org/jira/browse/KAFKA-13701 > Project: Kafka > Issue Type: Improvement >Reporter: Kowshik Prakasam >Priority: Major > > Certain background work such as UnifiedLog.flush() need not support > concurrency. Today in the existing KafkaScheduler, we are not able to pin > background work to specific threads. As a result we are unable to prevent > concurrent UnifiedLog.flush() calls, so we have to ensure UnifiedLog.flush() > implementation is thread safe by modifying the code at subtle areas (ex: [PR > #11814|https://github.com/apache/kafka/pull/11814]). The code will be simpler > if instead KafkaScheduler (or alike) could support pinning of certain > background work to specific threads, for example the UnifiedLog.flush() > operation for the same topic-partition will go to the same thread. This will > ensure strict ordering of flush() calls, thereby enabling us to write simpler > code eventually. > Subsequently, in this Jira we will modify the code to schedule > UnifiedLog.flush() only to specific background threads always (based on > topic-partition). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13701) Pin background worker threads for certain background work (ex: UnifiedLog.flush())
Kowshik Prakasam created KAFKA-13701: Summary: Pin background worker threads for certain background work (ex: UnifiedLog.flush()) Key: KAFKA-13701 URL: https://issues.apache.org/jira/browse/KAFKA-13701 Project: Kafka Issue Type: Improvement Reporter: Kowshik Prakasam Certain background work such as UnifiedLog.flush() need not support concurrency. Today in the existing KafkaScheduler, we are not able to pin background work to specific threads. As a result we are unable to prevent concurrent UnifiedLog.flush() calls, so we have to ensure UnifiedLog.flush() implementation is thread safe by modifying the code at subtle areas (ex: [PR #11814|https://github.com/apache/kafka/pull/11814]). The code will be simpler if instead KafkaScheduler (or alike) could support pinning of certain background work to specific threads, for example the UnifiedLog.flush() operation for the same topic-partition will go to the same thread. This will ensure strict ordering of flush() calls, thereby enabling us to write simpler code eventually. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13070) LogManager shutdown races with periodic work scheduled by the instance
[ https://issues.apache.org/jira/browse/KAFKA-13070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385712#comment-17385712 ] Kowshik Prakasam commented on KAFKA-13070: -- [~manasvigupta] I didn't realize you had assigned it to yourself. I was actually planning on working on this myself. Are you working on a fix for this relatively soon? If not, I will address it with a PR relatively soon. Please let me know. > LogManager shutdown races with periodic work scheduled by the instance > -- > > Key: KAFKA-13070 > URL: https://issues.apache.org/jira/browse/KAFKA-13070 > Project: Kafka > Issue Type: Bug >Reporter: Kowshik Prakasam >Assignee: Manasvi Gupta >Priority: Major > > In the LogManager shutdown sequence (in LogManager.shutdown()), we don't > cancel the periodic work scheduled by it prior to shutdown. As a result, the > periodic work could race with the shutdown sequence causing some unwanted > side effects. This is reproducible by a unit test in LogManagerTest. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13070) LogManager shutdown races with periodic work scheduled by the instance
[ https://issues.apache.org/jira/browse/KAFKA-13070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-13070: - Description: In the LogManager shutdown sequence (in LogManager.shutdown()), we don't cancel the periodic work scheduled by it prior to shutdown. As a result, the periodic work could race with the shutdown sequence causing some unwanted side effects. This is reproducible by a unit test in LogManagerTest. // set val maxLogAgeMs = 6 in the test @Test def testRetentionPeriodicWorkAfterShutdown(): Unit = { val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None) val logFile = new File(logDir, name + "-0") assertTrue(logFile.exists) log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0) log.updateHighWatermark(log.logEndOffset) logManager.shutdown() assertTrue(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath)) time.sleep(maxLogAgeMs + logManager.InitialTaskDelayMs + logManager.retentionCheckMs + 1) logManager = null } ``` was: In the LogManager shutdown sequence (in LogManager.shutdown()), we don't cancel the periodic work scheduled by it prior to shutdown. As a result, the periodic work could race with the shutdown sequence causing some unwanted side effects. This is reproducible by a unit test in LogManagerTest. ``` // set val maxLogAgeMs = 6 in the test @Test def testRetentionPeriodicWorkAfterShutdown(): Unit = { val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None) val logFile = new File(logDir, name + "-0") assertTrue(logFile.exists) log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0) log.updateHighWatermark(log.logEndOffset) logManager.shutdown() assertTrue(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath)) time.sleep(maxLogAgeMs + logManager.InitialTaskDelayMs + logManager.retentionCheckMs + 1) logManager = null } ``` > LogManager shutdown races with periodic work scheduled by the instance > -- > > Key: KAFKA-13070 > URL: https://issues.apache.org/jira/browse/KAFKA-13070 > Project: Kafka > Issue Type: Bug >Reporter: Kowshik Prakasam >Assignee: Manasvi Gupta >Priority: Major > > In the LogManager shutdown sequence (in LogManager.shutdown()), we don't > cancel the periodic work scheduled by it prior to shutdown. As a result, the > periodic work could race with the shutdown sequence causing some unwanted > side effects. This is reproducible by a unit test in LogManagerTest. > > // set val maxLogAgeMs = 6 in the test > @Test > def testRetentionPeriodicWorkAfterShutdown(): Unit = { > val log = logManager.getOrCreateLog(new TopicPartition(name, 0), > topicId = None) val logFile = new File(logDir, name + "-0") > assertTrue(logFile.exists) > log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), > leaderEpoch = 0) log.updateHighWatermark(log.logEndOffset) > logManager.shutdown() assertTrue(Files.exists(new File(logDir, > LogLoader.CleanShutdownFile).toPath)) time.sleep(maxLogAgeMs + > logManager.InitialTaskDelayMs + logManager.retentionCheckMs + 1) > logManager = null } > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13070) LogManager shutdown races with periodic work scheduled by the instance
[ https://issues.apache.org/jira/browse/KAFKA-13070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-13070: - Description: In the LogManager shutdown sequence (in LogManager.shutdown()), we don't cancel the periodic work scheduled by it prior to shutdown. As a result, the periodic work could race with the shutdown sequence causing some unwanted side effects. This is reproducible by a unit test in LogManagerTest. (was: In the LogManager shutdown sequence (in LogManager.shutdown()), we don't cancel the periodic work scheduled by it prior to shutdown. As a result, the periodic work could race with the shutdown sequence causing some unwanted side effects. This is reproducible by a unit test in LogManagerTest. // set val maxLogAgeMs = 6 in the test @Test def testRetentionPeriodicWorkAfterShutdown(): Unit = { val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None) val logFile = new File(logDir, name + "-0") assertTrue(logFile.exists) log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0) log.updateHighWatermark(log.logEndOffset) logManager.shutdown() assertTrue(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath)) time.sleep(maxLogAgeMs + logManager.InitialTaskDelayMs + logManager.retentionCheckMs + 1) logManager = null } ```) > LogManager shutdown races with periodic work scheduled by the instance > -- > > Key: KAFKA-13070 > URL: https://issues.apache.org/jira/browse/KAFKA-13070 > Project: Kafka > Issue Type: Bug >Reporter: Kowshik Prakasam >Assignee: Manasvi Gupta >Priority: Major > > In the LogManager shutdown sequence (in LogManager.shutdown()), we don't > cancel the periodic work scheduled by it prior to shutdown. As a result, the > periodic work could race with the shutdown sequence causing some unwanted > side effects. This is reproducible by a unit test in LogManagerTest. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13070) LogManager shutdown races with periodic work scheduled by the instance
[ https://issues.apache.org/jira/browse/KAFKA-13070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-13070: - Description: In the LogManager shutdown sequence (in LogManager.shutdown()), we don't cancel the periodic work scheduled by it prior to shutdown. As a result, the periodic work could race with the shutdown sequence causing some unwanted side effects. This is reproducible by a unit test in LogManagerTest. ``` // set val maxLogAgeMs = 6 in the test @Test def testRetentionPeriodicWorkAfterShutdown(): Unit = { val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None) val logFile = new File(logDir, name + "-0") assertTrue(logFile.exists) log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0) log.updateHighWatermark(log.logEndOffset) logManager.shutdown() assertTrue(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath)) time.sleep(maxLogAgeMs + logManager.InitialTaskDelayMs + logManager.retentionCheckMs + 1) logManager = null } ``` was: In the LogManager shutdown sequence (in LogManager.shutdown()), we don't cancel the periodic work scheduled by it prior to shutdown. As a result, the periodic work could race with the shutdown sequence causing some unwanted side effects. This is reproducible by a unit test in LogManagerTest. ``` // set val maxLogAgeMs = 6 in the test @Test def testRetentionPeriodicWorkAfterShutdown(): Unit = { val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None) val logFile = new File(logDir, name + "-0") assertTrue(logFile.exists) log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0) log.updateHighWatermark(log.logEndOffset) logManager.shutdown() assertTrue(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath)) time.sleep(maxLogAgeMs + logManager.InitialTaskDelayMs + logManager.retentionCheckMs + 1) logManager = null } ``` > LogManager shutdown races with periodic work scheduled by the instance > -- > > Key: KAFKA-13070 > URL: https://issues.apache.org/jira/browse/KAFKA-13070 > Project: Kafka > Issue Type: Bug >Reporter: Kowshik Prakasam >Assignee: Manasvi Gupta >Priority: Major > > In the LogManager shutdown sequence (in LogManager.shutdown()), we don't > cancel the periodic work scheduled by it prior to shutdown. As a result, the > periodic work could race with the shutdown sequence causing some unwanted > side effects. This is reproducible by a unit test in LogManagerTest. > > ``` > // set val maxLogAgeMs = 6 in the test > @Test > def testRetentionPeriodicWorkAfterShutdown(): Unit = { > val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId > = None) > val logFile = new File(logDir, name + "-0") > assertTrue(logFile.exists) > log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), > leaderEpoch = 0) > log.updateHighWatermark(log.logEndOffset) > logManager.shutdown() > assertTrue(Files.exists(new File(logDir, > LogLoader.CleanShutdownFile).toPath)) > time.sleep(maxLogAgeMs + logManager.InitialTaskDelayMs + > logManager.retentionCheckMs + 1) > logManager = null > } > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13070) LogManager shutdown races with periodic work scheduled by the instance
Kowshik Prakasam created KAFKA-13070: Summary: LogManager shutdown races with periodic work scheduled by the instance Key: KAFKA-13070 URL: https://issues.apache.org/jira/browse/KAFKA-13070 Project: Kafka Issue Type: Bug Reporter: Kowshik Prakasam In the LogManager shutdown sequence (in LogManager.shutdown()), we don't cancel the periodic work scheduled by it prior to shutdown. As a result, the periodic work could race with the shutdown sequence causing some unwanted side effects. This is reproducible by a unit test in LogManagerTest. ``` // set val maxLogAgeMs = 6 in the test @Test def testRetentionPeriodicWorkAfterShutdown(): Unit = { val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None) val logFile = new File(logDir, name + "-0") assertTrue(logFile.exists) log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0) log.updateHighWatermark(log.logEndOffset) logManager.shutdown() assertTrue(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath)) time.sleep(maxLogAgeMs + logManager.InitialTaskDelayMs + logManager.retentionCheckMs + 1) logManager = null } ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12977) Eliminate temporary ProducerStateManager in Log recovery logic
[ https://issues.apache.org/jira/browse/KAFKA-12977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12977: - Parent: KAFKA-12551 Issue Type: Sub-task (was: Improvement) > Eliminate temporary ProducerStateManager in Log recovery logic > -- > > Key: KAFKA-12977 > URL: https://issues.apache.org/jira/browse/KAFKA-12977 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > The temporary ProducerStateManager (PSM) instance created in the Log recovery > logic (inside LogLoader) is a source of complexity and confusion. For > example, when fixing KAFKA-12964 (see [PR# > 10896|https://github.com/apache/kafka/pull/10896]) we figured that there are > cases where the temporary PSM instance's state goes out of sync with the real > PSM instance (within LoadLogParams). And we need to adjust the code suitably > to handle for the consequences of these 2 instances being out of sync. To fix > this, we should just get rid of the temporary PSM instance which is used in > the following places: > # In LogLoader.recoverLog(), we could just pass in the real PSM. > # In LogLoader.completeSwapOperations(), we try to avoid recovering segment > here in [PR #10763|https://github.com/apache/kafka/pull/10763]. > # In LogLoader.loadSegmentFiles(), we probably need to clean this part of > the logic a bit. If we are missing index file or the index file is corrupted, > typically we can just rebuild the index without changing PSM. If the segment > is truncated while rebuilding the index, we actually want to follow the > process in step 1, by just removing the rest of the segments. So, we could > also get rid of the temporary PSM in this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13068) Rename Log to UnifiedLog
Kowshik Prakasam created KAFKA-13068: Summary: Rename Log to UnifiedLog Key: KAFKA-13068 URL: https://issues.apache.org/jira/browse/KAFKA-13068 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam Assignee: Kowshik Prakasam Once KAFKA-12554 is completed, we can rename Log -> UnifiedLog as described in the doc: [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12554) Split Log layer into Log and LocalLog
[ https://issues.apache.org/jira/browse/KAFKA-12554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12554: - Description: Split Log layer into Log and LocalLog based on the proposal described in this document: [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#]. (was: Split Log layer into UnifiedLog and LocalLog based on the proposal described in this document: https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#.) > Split Log layer into Log and LocalLog > - > > Key: KAFKA-12554 > URL: https://issues.apache.org/jira/browse/KAFKA-12554 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > Split Log layer into Log and LocalLog based on the proposal described in this > document: > [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12554) Split Log layer into Log and LocalLog
[ https://issues.apache.org/jira/browse/KAFKA-12554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12554: - Summary: Split Log layer into Log and LocalLog (was: Split Log layer into UnifiedLog and LocalLog) > Split Log layer into Log and LocalLog > - > > Key: KAFKA-12554 > URL: https://issues.apache.org/jira/browse/KAFKA-12554 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > Split Log layer into UnifiedLog and LocalLog based on the proposal described > in this document: > https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12977) Eliminate temporary ProducerStateManager in Log recovery logic
[ https://issues.apache.org/jira/browse/KAFKA-12977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam reassigned KAFKA-12977: Assignee: Kowshik Prakasam > Eliminate temporary ProducerStateManager in Log recovery logic > -- > > Key: KAFKA-12977 > URL: https://issues.apache.org/jira/browse/KAFKA-12977 > Project: Kafka > Issue Type: Improvement >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > The temporary ProducerStateManager (PSM) instance created in the Log recovery > logic (inside LogLoader) is a source of complexity and confusion. For > example, when fixing KAFKA-12964 (see [PR# > 10896|https://github.com/apache/kafka/pull/10896]) we figured that there are > cases where the temporary PSM instance's state goes out of sync with the real > PSM instance (within LoadLogParams). And we need to adjust the code suitably > to handle for the consequences of these 2 instances being out of sync. To fix > this, we should just get rid of the temporary PSM instance which is used in > the following places: > # In LogLoader.recoverLog(), we could just pass in the real PSM. > # In LogLoader.completeSwapOperations(), we try to avoid recovering segment > here in [PR #10763|https://github.com/apache/kafka/pull/10763]. > # In LogLoader.loadSegmentFiles(), we probably need to clean this part of > the logic a bit. If we are missing index file or the index file is corrupted, > typically we can just rebuild the index without changing PSM. If the segment > is truncated while rebuilding the index, we actually want to follow the > process in step 1, by just removing the rest of the segments. So, we could > also get rid of the temporary PSM in this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12977) Eliminate temporary ProducerStateManager in Log recovery logic
Kowshik Prakasam created KAFKA-12977: Summary: Eliminate temporary ProducerStateManager in Log recovery logic Key: KAFKA-12977 URL: https://issues.apache.org/jira/browse/KAFKA-12977 Project: Kafka Issue Type: Improvement Reporter: Kowshik Prakasam The temporary ProducerStateManager (PSM) instance created in the Log recovery logic (inside LogLoader) is a source of complexity and confusion. For example, when fixing KAFKA-12964 (see [PR# 10896|https://github.com/apache/kafka/pull/10896]) we figured that there are cases where the temporary PSM instance's state goes out of sync with the real PSM instance (within LoadLogParams). And we need to adjust the code suitably to handle for the consequences of these 2 instances being out of sync. To fix this, we should just get rid of the temporary PSM instance which is used in the following places: # In LogLoader.recoverLog(), we could just pass in the real PSM. # In LogLoader.completeSwapOperations(), we try to avoid recovering segment here in [PR #10763|https://github.com/apache/kafka/pull/10763]. # In LogLoader.loadSegmentFiles(), we probably need to clean this part of the logic a bit. If we are missing index file or the index file is corrupted, typically we can just rebuild the index without changing PSM. If the segment is truncated while rebuilding the index, we actually want to follow the process in step 1, by just removing the rest of the segments. So, we could also get rid of the temporary PSM in this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12955) Fix LogLoader to pass materialized view of segments for deletion
[ https://issues.apache.org/jira/browse/KAFKA-12955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12955: - Summary: Fix LogLoader to pass materialized view of segments for deletion (was: Fix LogLoader to pass materialized list of segments for deletion) > Fix LogLoader to pass materialized view of segments for deletion > > > Key: KAFKA-12955 > URL: https://issues.apache.org/jira/browse/KAFKA-12955 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Critical > > Within {{LogLoader.removeAndDeleteSegmentsAsync()}}, we should force > materialization of the {{segmentsToDelete}} iterable, to make sure the > results of the iteration remain valid and deterministic. We should also pass > only the materialized view to the logic that deletes the segments. Otherwise, > we could end up deleting the wrong segments asynchronously. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12955) Fix LogLoader to pass materialized list of segments for deletion
[ https://issues.apache.org/jira/browse/KAFKA-12955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12955: - Description: Within {{LogLoader.removeAndDeleteSegmentsAsync()}}, we should force materialization of the {{segmentsToDelete}} iterable, to make sure the results of the iteration remain valid and deterministic. We should also pass only the materialized view to the logic that deletes the segments. Otherwise, we could end up deleting the wrong segments asynchronously. (was: Within {{LogLoader.removeAndDeleteSegmentsAsync()}}, we should force materialization of the {{segmentsToDelete}} iterable, to make sure the results of the iteration remain valid and deterministic. We should also pass only the materialized view to the logic that deletes the segments.) > Fix LogLoader to pass materialized list of segments for deletion > > > Key: KAFKA-12955 > URL: https://issues.apache.org/jira/browse/KAFKA-12955 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Critical > > Within {{LogLoader.removeAndDeleteSegmentsAsync()}}, we should force > materialization of the {{segmentsToDelete}} iterable, to make sure the > results of the iteration remain valid and deterministic. We should also pass > only the materialized view to the logic that deletes the segments. Otherwise, > we could end up deleting the wrong segments asynchronously. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12955) Fix LogLoader to pass materialized list of segments for deletion
Kowshik Prakasam created KAFKA-12955: Summary: Fix LogLoader to pass materialized list of segments for deletion Key: KAFKA-12955 URL: https://issues.apache.org/jira/browse/KAFKA-12955 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam Within {{LogLoader.removeAndDeleteSegmentsAsync()}}, we should force materialization of the {{segmentsToDelete}} iterable, to make sure the results of the iteration remain valid and deterministic. We should also pass only the materialized view to the logic that deletes the segments. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12923) Log.splitOverflowedSegment logic can skip producer state snapshot deletion
[ https://issues.apache.org/jira/browse/KAFKA-12923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12923: - Priority: Minor (was: Major) > Log.splitOverflowedSegment logic can skip producer state snapshot deletion > -- > > Key: KAFKA-12923 > URL: https://issues.apache.org/jira/browse/KAFKA-12923 > Project: Kafka > Issue Type: Improvement >Reporter: Kowshik Prakasam >Priority: Minor > > In Log.splitOverflowedSegment logic, we probably don't have to delete > producer state snapshot > [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L2341] > since the old segment is replaced with a new segment with the same base > offset. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12923) Log.splitOverflowedSegment logic can skip producer state snapshot deletion
Kowshik Prakasam created KAFKA-12923: Summary: Log.splitOverflowedSegment logic can skip producer state snapshot deletion Key: KAFKA-12923 URL: https://issues.apache.org/jira/browse/KAFKA-12923 Project: Kafka Issue Type: Improvement Reporter: Kowshik Prakasam In Log.splitOverflowedSegment logic, we probably don't have to delete producer state snapshot [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L2341] since the old segment is replaced with a new segment with the same base offset. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12876) Log.roll() could forever delete producer state snapshot of empty active segment
Kowshik Prakasam created KAFKA-12876: Summary: Log.roll() could forever delete producer state snapshot of empty active segment Key: KAFKA-12876 URL: https://issues.apache.org/jira/browse/KAFKA-12876 Project: Kafka Issue Type: Bug Reporter: Kowshik Prakasam In Log.scala, during roll, if there is an existing segment of 0 size with the newOffsetToRoll then we end up [deleting|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1610] the active segment asynchronously. This will also delete the producer state snapshot. However, we also [take a producer snapshot|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1639] on newOffsetToRoll before we add the new segment. This addition could race with snapshot deletion and we can end up losing the snapshot forever. So, in this case the fix is to not delete the snapshot because we end up recreating it anyway. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12875) Change Log layer segment map mutations to avoid absence of active segment
Kowshik Prakasam created KAFKA-12875: Summary: Change Log layer segment map mutations to avoid absence of active segment Key: KAFKA-12875 URL: https://issues.apache.org/jira/browse/KAFKA-12875 Project: Kafka Issue Type: Improvement Reporter: Kowshik Prakasam [https://github.com/apache/kafka/pull/10650] showed a case where active segment was absent when Log layer segments were iterated. We should investigate Log layer code to see if we can change Log layer segment map mutations to avoid absence of active segment at any given point. For example, if we are clearing all segments and creating a new one, maybe we can reverse the order to create a new segment first and then clear the old ones later. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12867) Trogdor ConsumeBenchWorker quits prematurely with maxMessages config
Kowshik Prakasam created KAFKA-12867: Summary: Trogdor ConsumeBenchWorker quits prematurely with maxMessages config Key: KAFKA-12867 URL: https://issues.apache.org/jira/browse/KAFKA-12867 Project: Kafka Issue Type: Bug Reporter: Kowshik Prakasam The trogdor [ConsumeBenchWorker|https://github.com/apache/kafka/commits/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java] has a bug. If one of the consumption tasks completes executing successfully due to [maxMessages being consumed|https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L245], then, the consumption task [notifies the doneFuture|https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L285] causing the ConsumeBenchWorker to halt. This becomes a problem when more than 1 consumption task is running in parallel, because the successful completion of 1 of the tasks shuts down the entire worker while the other tasks are still running. When the worker is shut down, it [kills|https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L482] all the active consumption tasks, which is not the desired behavior. The fix is to not notify the doneFuture when 1 of the consumption tasks complete without error. Instead, we should defer the notification to the [CloseStatusUpdater|https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L299] thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12755) Add server-common, server-tools gradle modules
[ https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17339766#comment-17339766 ] Kowshik Prakasam commented on KAFKA-12755: -- cc [~satishd] with whom we discussed some of these today. > Add server-common, server-tools gradle modules > -- > > Key: KAFKA-12755 > URL: https://issues.apache.org/jira/browse/KAFKA-12755 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Attachments: out.jpg, out2.jpg > > > *Problems* > The core module takes a long time to compile. There are several reasons for > this. One is that it’s too big -- it would be better as several gradle > modules. Gradle is good about compiling multiple modules in parallel, but if > you have one really big module, you lose that parallelism. Another issue > with the core module is that it’s written in Scala, and the Scala compiler > takes longer than the Java one. > A lot of server-side code is in the “clients” module. From there, it ends up > on the CLASSPATH of producers, consumers, and admin clients. This has a lot > of bad effects: it bloats the size of the clients jar, and allows downstream > projects to peek at code that should be isolated to the broker. > A lot of tools can’t be put into the “tools” module because they depend on > classes that are in “core”. And tools can’t have a core dependency, because > that would impose a core dependency on connect as well. > One example of this problem is StorageTool and ClusterTool. These tools > ended up getting written in Scala and put in the “core” module, rather than > the “tools” module. > *Proposed Fixes* > Rename the “metadata” module to “controller” to reflect the fact that it > contains the controller > Make the "controller" module depend on "raft" rather than the other way > around ("raft" used to depend on "metadata") This reflects the fact that the > controller consumes the API provided by the raft module. (There is a > separate PR to do this.) > Create a new “server-common” module for common code which is shared by > several server modules, but not needed for clients. > Remove the dependency between "connect" and "tools" > Create a new “server-tools“ module which depends on “core” > *The Server-Common Module* > The server-common module should contain: > * Pluggable APIs that are used only in the server (not in any client) > * The KIP-405 tiered storage APIs > * Authorizer APIs > * CreateTopicPolicy, AlterConfigPolicy, etc. > * Common Java utility code that is used in the server, but not used in the > client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12722) Evaluate moving replaceSegments into LogSegments class
Kowshik Prakasam created KAFKA-12722: Summary: Evaluate moving replaceSegments into LogSegments class Key: KAFKA-12722 URL: https://issues.apache.org/jira/browse/KAFKA-12722 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam The logic to replace segments is currently present as static logic in Log.scala. Since it is operating on top of `existingSegments`, we should see if we can move it into LogSegments class where it could be a better fit: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L2296. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12575) Eliminate Log.isLogDirOffline boolean attribute
[ https://issues.apache.org/jira/browse/KAFKA-12575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam reassigned KAFKA-12575: Assignee: Kowshik Prakasam > Eliminate Log.isLogDirOffline boolean attribute > --- > > Key: KAFKA-12575 > URL: https://issues.apache.org/jira/browse/KAFKA-12575 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > This attribute was added in [https://github.com/apache/kafka/pull/9676] but > it is redundant and can be eliminated in favor of looking up > LogDirFailureChannel. The performance implication of a hash map inside > LogDirFailureChannel lookup should be low/none. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12575) Eliminate Log.isLogDirOffline boolean attribute
Kowshik Prakasam created KAFKA-12575: Summary: Eliminate Log.isLogDirOffline boolean attribute Key: KAFKA-12575 URL: https://issues.apache.org/jira/browse/KAFKA-12575 Project: Kafka Issue Type: Sub-task Components: core Reporter: Kowshik Prakasam This attribute was added in [https://github.com/apache/kafka/pull/9676] but it is redundant and can be eliminated in favor of looking up LogDirFailureChannel. The performance implication of a hash map inside LogDirFailureChannel lookup should be low/none. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12571) Eliminate LogEndOffset dependency on LeaderEpochFileCache c'tor
[ https://issues.apache.org/jira/browse/KAFKA-12571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12571: - Description: *This is a precursor to KAFKA-12553.* Before refactoring the recovery logic (KAFKA-12553), we would like to move the logic to [initialize LeaderEpochFileCache|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/log/Log.scala#L579] outside the Log class. Once we suitably initialize LeaderEpochFileCache outside Log, we will be able pass it as a dependency into both the Log class constructor and the recovery module. However, the LeaderEpochFileCache constructor takes a [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] on logEndOffset (via a callback). This dependency prevents the instantiation of LeaderEpochFileCache outside Log class. This situation blocks the recovery logic (KAFKA-12553) refactor. So this constructor dependency on logEndOffset needs to be eliminated. It turns out the logEndOffset dependency is used only in 1 of the LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for 1 particular [case|#L201]. Therefore, it should be possible to modify this so that we only pass the logEndOffset as a parameter into endOffsetFor whenever the method is called. was: *This is a precursor to KAFKA-12553.* Before refactoring the recovery logic (KAFKA-12553), we would like to move the logic to [initialize LeaderEpochFileCache|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/log/Log.scala#L579] outside the Log class. Once we suitably initialize LeaderEpochFileCache outside Log, we will be able pass it as a dependency into both the Log class constructor and the recovery module. However, the LeaderEpochFileCache constructor takes a [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] on logEndOffset (via a callback). This dependency blocks the recovery logic (KAFKA-12553) refactor since it prevents the instantiation of LeaderEpochFileCache outside Log class. So this dependency needs to be eliminated. It turns out the logEndOffset dependency is used only in 1 of the LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for 1 particular [case|#L201]. Therefore, it should be possible to modify this so that we only pass the logEndOffset as a parameter into endOffsetFor whenever the method is called. > Eliminate LogEndOffset dependency on LeaderEpochFileCache c'tor > --- > > Key: KAFKA-12571 > URL: https://issues.apache.org/jira/browse/KAFKA-12571 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > *This is a precursor to KAFKA-12553.* > Before refactoring the recovery logic (KAFKA-12553), we would like to move > the logic to [initialize > LeaderEpochFileCache|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/log/Log.scala#L579] > outside the Log class. Once we suitably initialize LeaderEpochFileCache > outside Log, we will be able pass it as a dependency into both the Log class > constructor and the recovery module. However, the LeaderEpochFileCache > constructor takes a > [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] > on logEndOffset (via a callback). This dependency prevents the instantiation > of LeaderEpochFileCache outside Log class. > This situation blocks the recovery logic (KAFKA-12553) refactor. So this > constructor dependency on logEndOffset needs to be eliminated. > It turns out the logEndOffset dependency is used only in 1 of the > LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for > 1 particular [case|#L201]. Therefore, it should be possible to modify this so > that we only pass the logEndOffset as a parameter into endOffsetFor > whenever the method is called. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12571) Eliminate LeaderEpochFileCache constructor dependency on LogEndOffset
[ https://issues.apache.org/jira/browse/KAFKA-12571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12571: - Summary: Eliminate LeaderEpochFileCache constructor dependency on LogEndOffset (was: Eliminate LogEndOffset dependency on LeaderEpochFileCache c'tor) > Eliminate LeaderEpochFileCache constructor dependency on LogEndOffset > - > > Key: KAFKA-12571 > URL: https://issues.apache.org/jira/browse/KAFKA-12571 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > *This is a precursor to KAFKA-12553.* > Before refactoring the recovery logic (KAFKA-12553), we would like to move > the logic to [initialize > LeaderEpochFileCache|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/log/Log.scala#L579] > outside the Log class. Once we suitably initialize LeaderEpochFileCache > outside Log, we will be able pass it as a dependency into both the Log class > constructor and the recovery module. However, the LeaderEpochFileCache > constructor takes a > [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] > on logEndOffset (via a callback). This dependency prevents the instantiation > of LeaderEpochFileCache outside Log class. > This situation blocks the recovery logic (KAFKA-12553) refactor. So this > constructor dependency on logEndOffset needs to be eliminated. > It turns out the logEndOffset dependency is used only in 1 of the > LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for > 1 particular [case|#L201]. Therefore, it should be possible to modify this so > that we only pass the logEndOffset as a parameter into endOffsetFor > whenever the method is called. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12571) Eliminate LogEndOffset dependency on LeaderEpochFileCache c'tor
[ https://issues.apache.org/jira/browse/KAFKA-12571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12571: - Description: *This is a precursor to KAFKA-12553.* Before refactoring the recovery logic (KAFKA-12553), we would like to move the logic to [initialize LeaderEpochFileCache|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/log/Log.scala#L579] outside the Log class. Once we suitably initialize LeaderEpochFileCache outside Log, we will be able pass it as a dependency into both the Log class constructor and the recovery module. However, the LeaderEpochFileCache constructor takes a [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] on logEndOffset (via a callback). This dependency blocks the recovery logic (KAFKA-12553) refactor since it prevents the instantiation of LeaderEpochFileCache outside Log class. So this dependency needs to be eliminated. It turns out the logEndOffset dependency is used only in 1 of the LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for 1 particular [case|#L201]. Therefore, it should be possible to modify this so that we only pass the logEndOffset as a parameter into endOffsetFor whenever the method is called. was: *This is a precursor to KAFKA-12553.* Before refactoring the recovery logic (KAFKA-12553), we would like to move the logic to [initialize LeaderEpochFileCache|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/log/Log.scala#L579] outside the Log class. This is so that we will be able to suitably initialize LeaderEpochFileCache outside Log and pass it as a dependency into both the Log class constructor and the recovery module. However, the LeaderEpochFileCache constructor takes a [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] on logEndOffset (via a callback). This dependency blocks the recovery logic (KAFKA-12553) refactor since it prevents the instantiation of LeaderEpochFileCache outside Log class. So this dependency needs to be eliminated. It turns out the logEndOffset dependency is used only in 1 of the LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for 1 particular [case|#L201]. Therefore, it should be possible to modify this so that we only pass the logEndOffset as a parameter into endOffsetFor whenever the method is called. > Eliminate LogEndOffset dependency on LeaderEpochFileCache c'tor > --- > > Key: KAFKA-12571 > URL: https://issues.apache.org/jira/browse/KAFKA-12571 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > *This is a precursor to KAFKA-12553.* > Before refactoring the recovery logic (KAFKA-12553), we would like to move > the logic to [initialize > LeaderEpochFileCache|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/log/Log.scala#L579] > outside the Log class. Once we suitably initialize LeaderEpochFileCache > outside Log, we will be able pass it as a dependency into both the Log class > constructor and the recovery module. > However, the LeaderEpochFileCache constructor takes a > [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] > on logEndOffset (via a callback). This dependency blocks the recovery logic > (KAFKA-12553) refactor since it prevents the instantiation of > LeaderEpochFileCache outside Log class. So this dependency needs to be > eliminated. > It turns out the logEndOffset dependency is used only in 1 of the > LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for > 1 particular [case|#L201]. Therefore, it should be possible to modify this so > that we only pass the logEndOffset as a parameter into endOffsetFor > whenever the method is called. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12571) Eliminate LogEndOffset dependency on LeaderEpochFileCache c'tor
[ https://issues.apache.org/jira/browse/KAFKA-12571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12571: - Description: *This is a precursor to KAFKA-12553.* Before refactoring the recovery logic (KAFKA-12553), we would like to move the logic to [initialize LeaderEpochFileCache|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/log/Log.scala#L579] outside the Log class. This is so that we will be able to suitably initialize LeaderEpochFileCache outside Log and pass it as a dependency into both the Log class constructor and the recovery module. However, the LeaderEpochFileCache constructor takes a [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] on logEndOffset (via a callback). This dependency blocks the recovery logic (KAFKA-12553) refactor since it prevents the instantiation of LeaderEpochFileCache outside Log class. So this dependency needs to be eliminated. It turns out the logEndOffset dependency is used only in 1 of the LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for 1 particular [case|#L201]. Therefore, it should be possible to modify this so that we only pass the logEndOffset as a parameter into endOffsetFor whenever the method is called. was: *This is a precursor to KAFKA-12553.* Before refactoring the recovery logic (KAFKA-12553), we would like to move the logic to [initialize LeaderEpochFileCache|#L579]] outside the Log class. This is so that we will be able to suitably initialize LeaderEpochFileCache outside Log and pass it as a dependency into both the Log class constructor and the recovery module. However, the LeaderEpochFileCache constructor takes a [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] on logEndOffset (via a callback). This dependency blocks the recovery logic (KAFKA-12553) refactor since it prevents the instantiation of LeaderEpochFileCache outside Log class. So this dependency needs to be eliminated. It turns out the logEndOffset dependency is used only in 1 of the LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for 1 particular [case|#L201]. Therefore, it should be possible to modify this so that we only pass the logEndOffset as a parameter into endOffsetFor whenever the method is called. > Eliminate LogEndOffset dependency on LeaderEpochFileCache c'tor > --- > > Key: KAFKA-12571 > URL: https://issues.apache.org/jira/browse/KAFKA-12571 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > *This is a precursor to KAFKA-12553.* > Before refactoring the recovery logic (KAFKA-12553), we would like to move > the logic to [initialize > LeaderEpochFileCache|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/log/Log.scala#L579] > outside the Log class. This is so that we will be able to suitably > initialize LeaderEpochFileCache outside Log and pass it as a dependency into > both the Log class constructor and the recovery module. > However, the LeaderEpochFileCache constructor takes a > [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] > on logEndOffset (via a callback). This dependency blocks the recovery logic > (KAFKA-12553) refactor since it prevents the instantiation of > LeaderEpochFileCache outside Log class. So this dependency needs to be > eliminated. > It turns out the logEndOffset dependency is used only in 1 of the > LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for > 1 particular [case|#L201]. Therefore, it should be possible to modify this so > that we only pass the logEndOffset as a parameter into endOffsetFor > whenever the method is called. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12571) Eliminate LogEndOffset dependency on LeaderEpochFileCache c'tor
[ https://issues.apache.org/jira/browse/KAFKA-12571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12571: - Description: *This is a precursor to KAFKA-12553.* Before refactoring the recovery logic (KAFKA-12553), we would like to move the logic to [initialize LeaderEpochFileCache|#L579]] outside the Log class. This is so that we will be able to suitably initialize LeaderEpochFileCache outside Log and pass it as a dependency into both the Log class constructor and the recovery module. However, the LeaderEpochFileCache constructor takes a [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] on logEndOffset (via a callback). This dependency blocks the recovery logic (KAFKA-12553) refactor since it prevents the instantiation of LeaderEpochFileCache outside Log class. So this dependency needs to be eliminated. It turns out the logEndOffset dependency is used only in 1 of the LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for 1 particular [case|#L201]. Therefore, it should be possible to modify this so that we only pass the logEndOffset as a parameter into endOffsetFor whenever the method is called. was: As a precursor to refactoring the recovery logic (KAFKA-12553), we would like to move the logic to [initialize LeaderEpochFileCache|[https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/log/Log.scala#L579]] outside the Log class. This is so that we will be able to suitably initialize LeaderEpochFileCache outside Log and pass it as a dependency into both the Log class constructor and the recovery module. However, the LeaderEpochFileCache constructor takes a [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] on logEndOffset (via a callback). This dependency blocks the recovery logic (KAFKA-12553) refactor since it prevents the instantiation of LeaderEpochFileCache outside Log class. So this dependency needs to be eliminated. It turns out the logEndOffset dependency is used only in 1 of the LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for 1 particular [case|[https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L201|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L201.]]. Therefore, it should be possible to modify this so that we only pass the logEndOffset as a parameter into endOffsetFor whenever the method is called. > Eliminate LogEndOffset dependency on LeaderEpochFileCache c'tor > --- > > Key: KAFKA-12571 > URL: https://issues.apache.org/jira/browse/KAFKA-12571 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > *This is a precursor to KAFKA-12553.* > Before refactoring the recovery logic (KAFKA-12553), we would like to move > the logic to [initialize LeaderEpochFileCache|#L579]] outside the Log class. > This is so that we will be able to suitably initialize LeaderEpochFileCache > outside Log and pass it as a dependency into both the Log class constructor > and the recovery module. > However, the LeaderEpochFileCache constructor takes a > [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] > on logEndOffset (via a callback). This dependency blocks the recovery logic > (KAFKA-12553) refactor since it prevents the instantiation of > LeaderEpochFileCache outside Log class. So this dependency needs to be > eliminated. > It turns out the logEndOffset dependency is used only in 1 of the > LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for > 1 particular [case|#L201]. Therefore, it should be possible to modify this so > that we only pass the logEndOffset as a parameter into endOffsetFor > whenever the method is called. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12552) Extract segments map out of Log class into separate class
[ https://issues.apache.org/jira/browse/KAFKA-12552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12552: - Description: *This is a precursor to KAFKA-12553.* Extract segments map out of Log class into separate class. This will improve the testability and maintainability of the Log layer, and also will be useful to subsequently refactor the recovery logic (see KAFKA-12553). was:Extract segments map out of Log class into separate class. This will improve the testability and maintainability of the Log layer, and also will be useful to subsequently refactor the recovery logic (see KAFKA-12553). > Extract segments map out of Log class into separate class > - > > Key: KAFKA-12552 > URL: https://issues.apache.org/jira/browse/KAFKA-12552 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > *This is a precursor to KAFKA-12553.* > Extract segments map out of Log class into separate class. This will improve > the testability and maintainability of the Log layer, and also will be useful > to subsequently refactor the recovery logic (see KAFKA-12553). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12571) Eliminate LogEndOffset dependency on LeaderEpochFileCache c'tor
Kowshik Prakasam created KAFKA-12571: Summary: Eliminate LogEndOffset dependency on LeaderEpochFileCache c'tor Key: KAFKA-12571 URL: https://issues.apache.org/jira/browse/KAFKA-12571 Project: Kafka Issue Type: Sub-task Components: core Reporter: Kowshik Prakasam Assignee: Kowshik Prakasam As a precursor to refactoring the recovery logic (KAFKA-12553), we would like to move the logic to [initialize LeaderEpochFileCache|[https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/log/Log.scala#L579]] outside the Log class. This is so that we will be able to suitably initialize LeaderEpochFileCache outside Log and pass it as a dependency into both the Log class constructor and the recovery module. However, the LeaderEpochFileCache constructor takes a [dependency|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L42] on logEndOffset (via a callback). This dependency blocks the recovery logic (KAFKA-12553) refactor since it prevents the instantiation of LeaderEpochFileCache outside Log class. So this dependency needs to be eliminated. It turns out the logEndOffset dependency is used only in 1 of the LeaderEpochFileCache methods: LeaderEpochFileCache.endOffsetFor, and just for 1 particular [case|[https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L201|https://github.com/apache/kafka/blob/d9bb2ef596343da9402bff4903b129cff1f7c22b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L201.]]. Therefore, it should be possible to modify this so that we only pass the logEndOffset as a parameter into endOffsetFor whenever the method is called. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12551) Refactor Kafka Log layer
Kowshik Prakasam created KAFKA-12551: Summary: Refactor Kafka Log layer Key: KAFKA-12551 URL: https://issues.apache.org/jira/browse/KAFKA-12551 Project: Kafka Issue Type: Improvement Components: core Reporter: Kowshik Prakasam Assignee: Kowshik Prakasam This is an umbrella Jira that tracks the work items for for Log layer refactor as described here: [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit?usp=sharing] . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12551) Refactor Kafka Log layer
[ https://issues.apache.org/jira/browse/KAFKA-12551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12551: - Issue Type: Improvement (was: New Feature) > Refactor Kafka Log layer > > > Key: KAFKA-12551 > URL: https://issues.apache.org/jira/browse/KAFKA-12551 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > This is an umbrella Jira that tracks the work items for for Log layer > refactor as described here: > [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit?usp=sharing] > . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12552) Extract segments map out of Log class into separate class
[ https://issues.apache.org/jira/browse/KAFKA-12552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12552: - Description: Extract segments map out of Log class into separate class. This will improve the testability and maintainability of the Log layer, and also will be useful to subsequently refactor the recovery logic (see KAFKA-12553). (was: Extract segments map out of Log class into separate class. This will improve the testability and maintainability of the Log layer, and also will be useful to subsequently refactor the recovery logic (see ) > Extract segments map out of Log class into separate class > - > > Key: KAFKA-12552 > URL: https://issues.apache.org/jira/browse/KAFKA-12552 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > Extract segments map out of Log class into separate class. This will improve > the testability and maintainability of the Log layer, and also will be useful > to subsequently refactor the recovery logic (see KAFKA-12553). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12552) Extract segments map out of Log class into separate class
[ https://issues.apache.org/jira/browse/KAFKA-12552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12552: - Description: Extract segments map out of Log class into separate class. This will improve the testability and maintainability of the Log layer, and also will be useful to subsequently refactor the recovery logic (see (was: Extract segments map out of Log class into separate class. This will be particularly useful to refactor the recovery logic in Log class.) > Extract segments map out of Log class into separate class > - > > Key: KAFKA-12552 > URL: https://issues.apache.org/jira/browse/KAFKA-12552 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > Extract segments map out of Log class into separate class. This will improve > the testability and maintainability of the Log layer, and also will be useful > to subsequently refactor the recovery logic (see -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12554) Split Log layer into UnifiedLog and LocalLog
Kowshik Prakasam created KAFKA-12554: Summary: Split Log layer into UnifiedLog and LocalLog Key: KAFKA-12554 URL: https://issues.apache.org/jira/browse/KAFKA-12554 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam Assignee: Kowshik Prakasam Split Log layer into UnifiedLog and LocalLog based on the proposal described in this document: https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12553) Refactor Log layer recovery logic
Kowshik Prakasam created KAFKA-12553: Summary: Refactor Log layer recovery logic Key: KAFKA-12553 URL: https://issues.apache.org/jira/browse/KAFKA-12553 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam Assignee: Kowshik Prakasam Refactor Log layer recovery logic by extracting it out of the kafka.log.Log class into separate modules. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12552) Extract segments map out of Log class into separate class
Kowshik Prakasam created KAFKA-12552: Summary: Extract segments map out of Log class into separate class Key: KAFKA-12552 URL: https://issues.apache.org/jira/browse/KAFKA-12552 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam Assignee: Kowshik Prakasam Extract segments map out of Log class into separate class. This will be particularly useful to refactor the recovery logic in Log class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12551) Refactor Kafka Log layer
[ https://issues.apache.org/jira/browse/KAFKA-12551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12551: - Issue Type: New Feature (was: Improvement) > Refactor Kafka Log layer > > > Key: KAFKA-12551 > URL: https://issues.apache.org/jira/browse/KAFKA-12551 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > This is an umbrella Jira that tracks the work items for for Log layer > refactor as described here: > [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit?usp=sharing] > . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12240) Proposal for Log layer refactoring
Kowshik Prakasam created KAFKA-12240: Summary: Proposal for Log layer refactoring Key: KAFKA-12240 URL: https://issues.apache.org/jira/browse/KAFKA-12240 Project: Kafka Issue Type: Improvement Reporter: Kowshik Prakasam Assignee: Kowshik Prakasam Link to document containing the proposed idea for Log layer refactor for KIP-405 be found here: [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit?usp=sharing] . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10832) Recovery logic is using incorrect ProducerStateManager instance when updating producers
Kowshik Prakasam created KAFKA-10832: Summary: Recovery logic is using incorrect ProducerStateManager instance when updating producers Key: KAFKA-10832 URL: https://issues.apache.org/jira/browse/KAFKA-10832 Project: Kafka Issue Type: Bug Reporter: Kowshik Prakasam Assignee: Kowshik Prakasam The bug is that from within {{Log.updateProducers(…)}}, the code operates on the {{producerStateManager}} attribute of the {{Log}} instance instead of operating on an input parameter. Please see [this|https://github.com/apache/kafka/blob/1d84f543678c4c08800bc3ea18c04a9db8adf7e4/core/src/main/scala/kafka/log/Log.scala#L1464] LOC where it calls {{producerStateManager.prepareUpdate}} thus accessing the attribute from the {{Log}} object (see [this|https://github.com/apache/kafka/blob/1d84f543678c4c08800bc3ea18c04a9db8adf7e4/core/src/main/scala/kafka/log/Log.scala#L251]). This looks unusual particularly for {{Log.loadProducersFromLog(...)}} [path|https://github.com/apache/kafka/blob/1d84f543678c4c08800bc3ea18c04a9db8adf7e4/core/src/main/scala/kafka/log/Log.scala#L956]. Here I believe we should be using the instance passed to the method, rather than the attribute from the {{Log}} instance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10723) LogManager leaks internal thread pool activity during shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. This is misleading and it could possibly break the general rule of avoiding post-shutdown activity in the Broker. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._ As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the "shut down completed" message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background, which is misleading. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple different ways to fix this: # Replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # Skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. This can require some changes to [this for loop|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L481-L496], so that we wait for all futures to complete regardless of whether one of them threw an error. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. This is misleading and it could possibly break the general rule of avoiding post-shutdown activity in the Broker. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._ As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the "shut down completed" message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background, which is misleading. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown b
[jira] [Updated] (KAFKA-10723) LogManager leaks internal thread pool activity during shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. This is misleading and it could possibly break the general rule of avoiding post-shutdown activity in the Broker. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._ As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the "shut down completed" message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background, which is misleading. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An alternate advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. This can require some changes to [this for loop|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L481-L496], so that we wait for all futures to complete regardless of whether one of them threw an error. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._ As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the "shut down completed" message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background, which is quite misleading. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call.
[jira] [Updated] (KAFKA-10723) LogManager leaks internal thread pool activity during shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._ As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the "shut down completed" message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background, which is quite misleading. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An alternate advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. This can require some changes to [this for loop|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L481-L496], so that we wait for all futures to complete regardless of whether one of them threw an error. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._ As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the "shut down completed" message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background, which is quite misleading. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affect
[jira] [Updated] (KAFKA-10723) LogManager leaks internal thread pool activity during shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._ As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the "shut down completed" message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background, which is quite misleading. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._ As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the "shut down completed" message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. > LogManager leaks internal thread pool activity during shutdown > -- > > Key: KAFKA-
[jira] [Updated] (KAFKA-10723) LogManager leaks internal thread pool activity during shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._ As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(_.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. > LogManager leaks internal thread pool activity during shutdown > -- > > Key: KAFKA-10723 > URL:
[jira] [Updated] (KAFKA-10723) LogManager leaks internal thread pool activity during shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._ As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the "shut down completed" message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._ As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. > LogManager leaks internal thread pool activity during shutdown > -- > > Key: KAFKA-10723 > URL
[jira] [Updated] (KAFKA-10723) LogManager leaks internal thread pool activity during shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. The code: _threadPools.foreach(_.shutdown())_ initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. > LogManager leaks internal thread pool activity during shutdown > -- > > Key: KAFKA-10723 >
[jira] [Updated] (KAFKA-10723) LogManager leaks internal thread pool activity during shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shuts down the thread pools *asynchronously*. {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shutsdown the thread pools *asynchronously*. {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. > LogManager leaks internal thread pool activity during shutdown > -- > > Key: KAFKA-10723 >
[jira] [Updated] (KAFKA-10723) LogManager leaks internal thread pool activity during shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Summary: LogManager leaks internal thread pool activity during shutdown (was: LogManager shutdown error handler doesn't shutdown all internal thread pools) > LogManager leaks internal thread pool activity during shutdown > -- > > Key: KAFKA-10723 > URL: https://issues.apache.org/jira/browse/KAFKA-10723 > Project: Kafka > Issue Type: Bug >Reporter: Kowshik Prakasam >Priority: Major > > *TL;DR:* > The asynchronous shutdown in {{LogManager}} has the shortcoming that if > during shutdown any of the internal futures fail, then we do not always > ensure that all futures are completed before {{LogManager.shutdown}} returns. > As a result, despite the shut down completed message from KafkaServer is seen > in the error logs, some futures continue to run from inside LogManager > attempting to close the logs. > *Description:* > When LogManager is shutting down, exceptions in log closure are handled > [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. > However, this > [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] > in the finally clause shutsdown the thread pools *asynchronously*. > {noformat} > threadPools.foreach(_.shutdown()){noformat} > initiates an orderly shutdown (for each thread pool) in which previously > submitted tasks are executed, but no new tasks will be accepted (see javadoc > link > [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). > As a result, if there is an exception during log closure, some of the thread > pools which are closing logs could be leaked and continue to run in the > background, after the control returns to the caller (i.e. {{KafkaServer}}). > As a result, even after the _shut down completed_ message is seen in the > error logs (originating from {{KafkaServer}} shutdown sequence), log closures > continue to happen in the background. > > *Proposed options for fixes:* > It seems useful that we maintain the contract with {{KafkaServer}} that after > {{LogManager.shutdown}} is called once, all tasks that close the logs are > guaranteed to have completed before the call returns. There are probably > couple ways to fix this: > # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with > _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait > for all threads to be shutdown before returning the {{_LogManager.shutdown_}} > call. > # An advanced fix could be to just skip creating of checkpoint and clean > shutdown file only for the affected directory if any of its futures throw an > error. We continue to wait for all futures to complete for all directories. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10723) LogManager shutdown error handler doesn't shutdown all internal thread pools
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shutsdown the thread pools *asynchronously*. {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{_LogManager.shutdown_}} call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shutsdown the thread pools *asynchronously*. {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the \{{LogManager.shutdown }}call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. > LogManager shutdown error handler doesn't shutdown all internal thread pools > > >
[jira] [Updated] (KAFKA-10723) LogManager shutdown error handler doesn't shutdown all internal thread pools
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shutsdown the thread pools *asynchronously*. {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for all threads to be shutdown before returning the \{{LogManager.shutdown }}call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shutsdown the thread pools *asynchronously*. {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{threadPools.foreach(.shutdown()) }}with __ {{threadPools.foreach(.awaitTermination())}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{LogManager.shutdown }}call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. > LogManager shutdown error handler doesn't shutdown all internal thread pools > > >
[jira] [Updated] (KAFKA-10723) LogManager shutdown error handler doesn't shutdown all internal thread pools
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shutsdown the thread pools *asynchronously*. {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace {{threadPools.foreach(.shutdown()) }}with __ {{threadPools.foreach(.awaitTermination())}}{{.}} This ensures that we wait for all threads to be shutdown before returning the {{LogManager.shutdown }}call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shutsdown the thread pools *asynchronously*. {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace _threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_. This ensures that we wait for all threads to be shutdown before returning the _LogManager.shutdown_ call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. > LogManager shutdown error handler doesn't shutdown all internal thread pools > > > Key: KA
[jira] [Updated] (KAFKA-10723) LogManager shutdown error handler doesn't shutdown all internal thread pools
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shutsdown the thread pools *asynchronously*. {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace _threadPools.foreach(.shutdown())_ with _threadPools.foreach(.awaitTermination())_. This ensures that we wait for all threads to be shutdown before returning the _LogManager.shutdown_ call. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shutsdown the thread pools *asynchronously*. {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace `threadPools.foreach(_.shutdown())` with `threadPools.foreach(_.awaitTermination())`. This ensures that we wait for all threads to be shutdown before returning `LogManager.shutdown`. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. > LogManager shutdown error handler doesn't shutdown all internal thread pools > > > Key: KAFKA-10723 >
[jira] [Updated] (KAFKA-10723) LogManager shutdown error handler doesn't shutdown all internal thread pools
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause shutsdown the thread pools *asynchronously*. {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). As a result, if there is an exception during log closure, some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed options for fixes:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all tasks that close the logs are guaranteed to have completed before the call returns. There are probably couple ways to fix this: # A simple fix could be to replace `threadPools.foreach(_.shutdown())` with `threadPools.foreach(_.awaitTermination())`. This ensures that we wait for all threads to be shutdown before returning `LogManager.shutdown`. # An advanced fix could be to just skip creating of checkpoint and clean shutdown file only for the affected directory if any of its futures throw an error. We continue to wait for all futures to complete for all directories. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause may still throw an exception once again (for ex: if any of the tasks fail during thread pool shutdown): {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). It is possible that any of the other running threads that asynchronously close logs may throw an exception. When this happens, {{ExecutoreService.shutdown()}} can throw an exception. This situation is not handled inside the finally clause which means that some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed fix:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all thread pools that close the logs are guaranteed to be attempted shutdown once before the control returns to the caller (either via an exception case or a success case). Supposing when multiple exceptions are raised when closing the logs (from different thread pools), probably we can just raise the first exception among these to the caller of LogManager.shutdown (i.e. {{KafkaServer}} ) while optionally logging the rest of the exceptions to stderr. > LogManager shutdown error handler doesn't shutdown all internal thread pools >
[jira] [Updated] (KAFKA-10723) LogManager shutdown error handler doesn't shutdown all internal thread pools
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause may still throw an exception once again (for ex: if any of the tasks fail during thread pool shutdown): {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). It is possible that any of the other running threads that asynchronously close logs may throw an exception. When this happens, {{ExecutoreService.shutdown()}} can throw an exception. This situation is not handled inside the finally clause which means that some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed fix:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all thread pools that close the logs are guaranteed to be attempted shutdown once before the control returns to the caller (either via an exception case or a success case). Supposing when multiple exceptions are raised when closing the logs (from different thread pools), probably we can just raise the first exception among these to the caller of LogManager.shutdown (i.e. {{KafkaServer}} ) while optionally logging the rest of the exceptions to stderr. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause may still throw an exception once again (for ex: if any of the tasks fail during thread pool shutdown): {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). It is possible that any of the other running threads that asynchronously close logs may throw an exception. When this happens, {{ExecutoreService.shutdown()}} can throw an exception. This situation is not handled inside the finally clause which means that some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed fix:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all thread pools that close the logs are guaranteed to be attempted shutdown once before the control returns to the caller (either via an exception case or a success case). Supposing when multiple exceptions are raised when closing the logs (from different thread pools), probably we can just raise the first exception among these to the caller of LogManager.shutdown (i.e. {{KafkaServer}} ) while optionally loggi
[jira] [Updated] (KAFKA-10723) LogManager shutdown error handler doesn't shutdown all internal thread pools
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause may still throw an exception once again (for ex: if any of the tasks fail during thread pool shutdown): {noformat} threadPools.foreach(_.shutdown()){noformat} initiates an orderly shutdown (for each thread pool) in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). It is possible that any of the other running threads that asynchronously close logs may throw an exception. When this happens, {{ExecutoreService.shutdown()}} can throw an exception. This situation is not handled inside the finally clause which means that some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed fix:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all thread pools that close the logs are guaranteed to be attempted shutdown once before the control returns to the caller (either via an exception case or a success case). Supposing when multiple exceptions are raised when closing the logs (from different thread pools), probably we can just raise the first exception among these to the caller of LogManager.shutdown (i.e. {{KafkaServer}} ) while optionally logging the rest of the exceptions to stderr. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause may still throw an exception once again (for ex: if any of the tasks fail during thread pool shutdown): {noformat} threadPools.foreach(_.shutdown()){noformat} Note: {noformat} ExecutorService.shutdown(){noformat} initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). It is possible that any of the other running threads that asynchronously close logs may throw an exception. When this happens, {{ExecutoreService.shutdown()}} can throw an exception. This situation is not handled inside the finally clause which means that some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed fix:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all thread pools that close the logs are guaranteed to be attempted shutdown once before the control returns to the caller (either via an exception case or a success case). Supposing when multiple exceptions are raised when closing the logs (from different thread pools), probably we can just raise the first exception among these to the caller of LogManager.shutdown (i.e. {
[jira] [Updated] (KAFKA-10723) LogManager shutdown error handler doesn't shutdown all internal thread pools
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause may still throw an exception once again (for ex: if any of the tasks fail during thread pool shutdown): {noformat} threadPools.foreach(_.shutdown()){noformat} Note: {noformat} ExecutorService.shutdown(){noformat} initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). It is possible that any of the other running threads that asynchronously close logs may throw an exception. When this happens, {{ExecutoreService.shutdown()}} can throw an exception. This situation is not handled inside the finally clause which means that some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. {{KafkaServer}}). As a result, even after the _shut down completed_ message is seen in the error logs (originating from {{KafkaServer}} shutdown sequence), log closures continue to happen in the background. *Proposed fix:* It seems useful that we maintain the contract with {{KafkaServer}} that after {{LogManager.shutdown}} is called once, all thread pools that close the logs are guaranteed to be attempted shutdown once before the control returns to the caller (either via an exception case or a success case). Supposing when multiple exceptions are raised when closing the logs (from different thread pools), probably we can just raise the first exception among these to the caller of LogManager.shutdown (i.e. {{KafkaServer}} ) while optionally logging the rest of the exceptions to stderr. was: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause may still throw an exception once again (for ex: if any of the tasks fail during thread pool shutdown): {noformat} threadPools.foreach(_.shutdown()){noformat} Note that ExecutorService.shutdown() initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). It is possible that any of the other running threads that asynchronously close logs may throw an exception. When this happens, ExecutoreService.shutdown() can throw an exception. This situation is not handled inside the finally clause which means that some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. KafkaServer). As a result, even after the `shut down completed` message is seen in the error logs, log closures continue to happen in the background. *Proposed fix:* It seems useful that we maintain the contract with KafkaServer that after LogManager.shutdown is called once, all thread pools that close the logs are guaranteed to be attempted shutdown once before the control returns to the caller (either via an exception case or a success case). Supposing when multiple exceptions are raised when closing the logs (from different thread pools), probably we can just raise the first exception among these to the caller of LogManager.shutdown (i.e. KafkaServer ) while optionally logging the rest of the exce
[jira] [Updated] (KAFKA-10723) LogManager shutdown error handler doesn't shutdown all internal thread pools
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in {{LogManager}} has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before {{LogManager.shutdown}} returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause may still throw an exception once again (for ex: if any of the tasks fail during thread pool shutdown): {noformat} threadPools.foreach(_.shutdown()){noformat} Note that ExecutorService.shutdown() initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). It is possible that any of the other running threads that asynchronously close logs may throw an exception. When this happens, ExecutoreService.shutdown() can throw an exception. This situation is not handled inside the finally clause which means that some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. KafkaServer). As a result, even after the `shut down completed` message is seen in the error logs, log closures continue to happen in the background. *Proposed fix:* It seems useful that we maintain the contract with KafkaServer that after LogManager.shutdown is called once, all thread pools that close the logs are guaranteed to be attempted shutdown once before the control returns to the caller (either via an exception case or a success case). Supposing when multiple exceptions are raised when closing the logs (from different thread pools), probably we can just raise the first exception among these to the caller of LogManager.shutdown (i.e. KafkaServer ) while optionally logging the rest of the exceptions to stderr. was: *TL;DR:* The asynchronous shutdown in LogManager has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before LogManager.shutdown returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause may still throw an exception once again (for ex: if any of the tasks fail during thread pool shutdown): threadPools.foreach(_.shutdown()) Note that ExecutorService.shutdown() initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). It is possible that any of the other running threads that asynchronously close logs may throw an exception. When this happens, ExecutoreService.shutdown() can throw an exception. This situation is not handled inside the finally clause which means that some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. KafkaServer). As a result, even after the `shut down completed` message is seen in the error logs, log closures continue to happen in the background. *Proposed fix:* It seems useful that we maintain the contract with KafkaServer that after LogManager.shutdown is called once, all thread pools that close the logs are guaranteed to be attempted shutdown once before the control returns to the caller (either via an exception case or a success case). Supposing when multiple exceptions are raised when closing the logs (from different thread pools), probably we can just raise the first exception among these to the caller of LogManager.shutdown (i.e. KafkaServer ) while optionally logging the rest of the exceptions to stderr. > LogManager shutdown error handler doesn't shutdown all internal thread pools > -
[jira] [Updated] (KAFKA-10723) LogManager shutdown error handler doesn't shutdown all internal thread pools
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10723: - Description: *TL;DR:* The asynchronous shutdown in LogManager has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before LogManager.shutdown returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause may still throw an exception once again (for ex: if any of the tasks fail during thread pool shutdown): threadPools.foreach(_.shutdown()) Note that ExecutorService.shutdown() initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). It is possible that any of the other running threads that asynchronously close logs may throw an exception. When this happens, ExecutoreService.shutdown() can throw an exception. This situation is not handled inside the finally clause which means that some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. KafkaServer). As a result, even after the `shut down completed` message is seen in the error logs, log closures continue to happen in the background. *Proposed fix:* It seems useful that we maintain the contract with KafkaServer that after LogManager.shutdown is called once, all thread pools that close the logs are guaranteed to be attempted shutdown once before the control returns to the caller (either via an exception case or a success case). Supposing when multiple exceptions are raised when closing the logs (from different thread pools), probably we can just raise the first exception among these to the caller of LogManager.shutdown (i.e. KafkaServer ) while optionally logging the rest of the exceptions to stderr. was: *TL;DR:* The asynchronous shutdown in LogManager has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before LogManager.shutdown returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause may still throw an exception once again (for ex: if any of the tasks fail during thread pool shutdown): threadPools.foreach(_.shutdown()) Note that ExecutorService.shutdown() initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). It is possible that any of the other running threads that asynchronously close logs may throw an exception. When this happens, ExecutoreService.shutdown() can throw an exception. This situation is not handled inside the finally clause which means that some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. KafkaServer). As a result, even after the `shut down completed` message is seen in the error logs, log closures continue to happen in the background. *Proposed fix:* It seems useful that we maintain the contract with KafkaServer that after LogManager.shutdown is called once, all thread pools that close the logs are guaranteed to be attempted shutdown once before the control returns to the caller (either via an exception case or a success case). Supposing when multiple exceptions are raised when closing the logs (from different thread pools), probably we can just raise the first exception among these to the caller of LogManager.shutdown (i.e. KafkaServer ) while optionally logging the rest of the exceptions to stderr. > LogManager shutdown error handler doesn't shutdown all internal thread pools > -
[jira] [Created] (KAFKA-10723) LogManager shutdown error handler doesn't shutdown all internal thread pools
Kowshik Prakasam created KAFKA-10723: Summary: LogManager shutdown error handler doesn't shutdown all internal thread pools Key: KAFKA-10723 URL: https://issues.apache.org/jira/browse/KAFKA-10723 Project: Kafka Issue Type: Bug Reporter: Kowshik Prakasam *TL;DR:* The asynchronous shutdown in LogManager has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before LogManager.shutdown returns. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. *Description:* When LogManager is shutting down, exceptions in log closure are handled [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. However, this [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] in the finally clause may still throw an exception once again (for ex: if any of the tasks fail during thread pool shutdown): threadPools.foreach(_.shutdown()) Note that ExecutorService.shutdown() initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted (see javadoc link [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()]). It is possible that any of the other running threads that asynchronously close logs may throw an exception. When this happens, ExecutoreService.shutdown() can throw an exception. This situation is not handled inside the finally clause which means that some of the thread pools which are closing logs could be leaked and continue to run in the background, after the control returns to the caller (i.e. KafkaServer). As a result, even after the `shut down completed` message is seen in the error logs, log closures continue to happen in the background. *Proposed fix:* It seems useful that we maintain the contract with KafkaServer that after LogManager.shutdown is called once, all thread pools that close the logs are guaranteed to be attempted shutdown once before the control returns to the caller (either via an exception case or a success case). Supposing when multiple exceptions are raised when closing the logs (from different thread pools), probably we can just raise the first exception among these to the caller of LogManager.shutdown (i.e. KafkaServer ) while optionally logging the rest of the exceptions to stderr. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10624) [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration
[ https://issues.apache.org/jira/browse/KAFKA-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam reassigned KAFKA-10624: Assignee: Kowshik Prakasam > [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration > > > Key: KAFKA-10624 > URL: https://issues.apache.org/jira/browse/KAFKA-10624 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Minor > > In Scala, we prefer sealed traits over Enumeration since the former gives you > exhaustiveness checking. With Scala Enumeration, you don't get a warning if > you add a new value that is not handled in a given pattern match. > This Jira tracks refactoring enum > [FeatureZNodeStatus|https://github.com/apache/kafka/blob/fb4f297207ef62f71e4a6d2d0dac75752933043d/core/src/main/scala/kafka/zk/ZkData.scala#L801] > from an enum to a sealed trait. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10139) [Easy] Add operational guide for failure recovery
[ https://issues.apache.org/jira/browse/KAFKA-10139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10139: - Summary: [Easy] Add operational guide for failure recovery (was: Add operational guide for failure recovery) > [Easy] Add operational guide for failure recovery > - > > Key: KAFKA-10139 > URL: https://issues.apache.org/jira/browse/KAFKA-10139 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Boyang Chen >Priority: Major > > In the first released version, we should include an operation manual to the > feature versioning failure cases, such as: > 1. broker crash due to violation of feature versioning > 2. ZK data corruption (rare) > We need to ensure this work gets reflected in the AK documentation after the > implementation and thorough testings are done. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10622) [Medium] Implement support for feature version deprecation
[ https://issues.apache.org/jira/browse/KAFKA-10622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10622: - Summary: [Medium] Implement support for feature version deprecation (was: Implement support for feature version deprecation) > [Medium] Implement support for feature version deprecation > -- > > Key: KAFKA-10622 > URL: https://issues.apache.org/jira/browse/KAFKA-10622 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > This Jira tracks the implementation of feature version deprecation support > for KIP-584. > The feature version deprecation is future work > ([link|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-Featureversiondeprecation]). > We didn’t find a need to implement it immediately as part of AK 2.7 release > for KIP-584. The reason is that we don’t have features defined yet as part of > AK 2.7 release and it’ll be a long time (years) before we start to deprecate > feature versions. So there is no immediate need to implement the support. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10621) [Easy] Implement advanced CLI tool for feature versioning system
[ https://issues.apache.org/jira/browse/KAFKA-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10621: - Summary: [Easy] Implement advanced CLI tool for feature versioning system (was: Implement advanced CLI tool for feature versioning system) > [Easy] Implement advanced CLI tool for feature versioning system > > > Key: KAFKA-10621 > URL: https://issues.apache.org/jira/browse/KAFKA-10621 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > Implement advanced CLI tool capabilities for the feature versioning system > providing the facilities as explained in [this > section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage] > of KIP-584. The implementation needs to be done in > [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] > class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10624) [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration
[ https://issues.apache.org/jira/browse/KAFKA-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10624: - Summary: [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration (was: FeatureZNodeStatus should use sealed trait instead of Enumeration) > [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration > > > Key: KAFKA-10624 > URL: https://issues.apache.org/jira/browse/KAFKA-10624 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > In Scala, we prefer sealed traits over Enumeration since the former gives you > exhaustiveness checking. With Scala Enumeration, you don't get a warning if > you add a new value that is not handled in a given pattern match. > This Jira tracks refactoring enum > [FeatureZNodeStatus|https://github.com/apache/kafka/blob/fb4f297207ef62f71e4a6d2d0dac75752933043d/core/src/main/scala/kafka/zk/ZkData.scala#L801] > from an enum to a sealed trait. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10623) [Easy] Refactor code to avoid discovery conflicts for classes:{Supported|Finalized}VersionRange
[ https://issues.apache.org/jira/browse/KAFKA-10623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10623: - Summary: [Easy] Refactor code to avoid discovery conflicts for classes:{Supported|Finalized}VersionRange (was: Refactor code to avoid discovery conflicts for classes:{Supported|Finalized}VersionRange) > [Easy] Refactor code to avoid discovery conflicts for > classes:{Supported|Finalized}VersionRange > --- > > Key: KAFKA-10623 > URL: https://issues.apache.org/jira/browse/KAFKA-10623 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > This Jira suggests changing few existing class names to avoid class discovery > conflicts. Particularly the following classes: > {code:java} > org.apache.kafka.clients.admin.{Supported|Finalized}VersionRange{code} > conflict with > > > {code:java} > org.apache.kafka.common.feature.{Supported|Finalized}VersionRange{code} > The former is internal facing, while the latter is external facing (since it > is used in the Admin#describeFeatures API). So, the internal facing classes > can be renamed suitably. Possible alternative naming suggestions: > > > {code:java} > org.apache.kafka.clients.admin.{Supported|Finalized}Versions > {code} > {code:java} > org.apache.kafka.clients.admin.Broker{Supported|Finalized}Versions > {code} > {code:java} > org.apache.kafka.clients.admin.Broker{Supported|Finalized}VersionRange{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10624) FeatureZNodeStatus should use sealed trait instead of Enumeration
Kowshik Prakasam created KAFKA-10624: Summary: FeatureZNodeStatus should use sealed trait instead of Enumeration Key: KAFKA-10624 URL: https://issues.apache.org/jira/browse/KAFKA-10624 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam In Scala, we prefer sealed traits over Enumeration since the former gives you exhaustiveness checking. With Scala Enumeration, you don't get a warning if you add a new value that is not handled in a given pattern match. This Jira tracks refactoring enum [FeatureZNodeStatus|https://github.com/apache/kafka/blob/fb4f297207ef62f71e4a6d2d0dac75752933043d/core/src/main/scala/kafka/zk/ZkData.scala#L801] from an enum to a sealed trait. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10623) Refactor code to avoid discovery conflicts for classes:{Supported|Finalized}VersionRange
Kowshik Prakasam created KAFKA-10623: Summary: Refactor code to avoid discovery conflicts for classes:{Supported|Finalized}VersionRange Key: KAFKA-10623 URL: https://issues.apache.org/jira/browse/KAFKA-10623 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam This Jira suggests changing few existing class names to avoid class discovery conflicts. Particularly the following classes: {code:java} org.apache.kafka.clients.admin.{Supported|Finalized}VersionRange{code} conflict with {code:java} org.apache.kafka.common.feature.{Supported|Finalized}VersionRange{code} The former is internal facing, while the latter is external facing (since it is used in the Admin#describeFeatures API). So, the internal facing classes can be renamed suitably. Possible alternative naming suggestions: {code:java} org.apache.kafka.clients.admin.{Supported|Finalized}Versions {code} {code:java} org.apache.kafka.clients.admin.Broker{Supported|Finalized}Versions {code} {code:java} org.apache.kafka.clients.admin.Broker{Supported|Finalized}VersionRange{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10622) Implement support for feature version deprecation
Kowshik Prakasam created KAFKA-10622: Summary: Implement support for feature version deprecation Key: KAFKA-10622 URL: https://issues.apache.org/jira/browse/KAFKA-10622 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam This Jira tracks the implementation of feature version deprecation support for KIP-584. The feature version deprecation is future work ([link|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-Featureversiondeprecation]). We didn’t find a need to implement it immediately as part of AK 2.7 release for KIP-584. The reason is that we don’t have features defined yet as part of AK 2.7 release and it’ll be a long time (years) before we start to deprecate feature versions. So there is no immediate need to implement the support. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10621) Implement advanced CLI tool for feature versioning system
[ https://issues.apache.org/jira/browse/KAFKA-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10621: - Description: Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in [this section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage] of KIP-584. The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. (was: Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in [[this section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]|#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]] of KIP-584. The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. ) > Implement advanced CLI tool for feature versioning system > - > > Key: KAFKA-10621 > URL: https://issues.apache.org/jira/browse/KAFKA-10621 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > Implement advanced CLI tool capabilities for the feature versioning system > providing the facilities as explained in [this > section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage] > of KIP-584. The implementation needs to be done in > [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] > class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10621) Implement advanced CLI tool for feature versioning system
[ https://issues.apache.org/jira/browse/KAFKA-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10621: - Description: Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in [this section|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]] of KIP-584. The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. (was: Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in this section of KIP-584: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage] . The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. ) > Implement advanced CLI tool for feature versioning system > - > > Key: KAFKA-10621 > URL: https://issues.apache.org/jira/browse/KAFKA-10621 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > Implement advanced CLI tool capabilities for the feature versioning system > providing the facilities as explained in [this > section|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]] > of KIP-584. The implementation needs to be done in > [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] > class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10621) Implement advanced CLI tool for feature versioning system
[ https://issues.apache.org/jira/browse/KAFKA-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10621: - Description: Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in [[this section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]|#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]] of KIP-584. The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. (was: Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in [this section|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]] of KIP-584. The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. ) > Implement advanced CLI tool for feature versioning system > - > > Key: KAFKA-10621 > URL: https://issues.apache.org/jira/browse/KAFKA-10621 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > Implement advanced CLI tool capabilities for the feature versioning system > providing the facilities as explained in [[this > section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]|#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]] > of KIP-584. The implementation needs to be done in > [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] > class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10621) Implement advanced CLI tool for feature versioning system
Kowshik Prakasam created KAFKA-10621: Summary: Implement advanced CLI tool for feature versioning system Key: KAFKA-10621 URL: https://issues.apache.org/jira/browse/KAFKA-10621 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in this section of KIP-584: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage] . The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10599) Implement basic CLI tool for feature versioning system
Kowshik Prakasam created KAFKA-10599: Summary: Implement basic CLI tool for feature versioning system Key: KAFKA-10599 URL: https://issues.apache.org/jira/browse/KAFKA-10599 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam Assignee: Kowshik Prakasam Implement a basic CLI tool for the feature versioning system providing the basic facilities as explained in this section of KIP-584: https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-BasicCLItoolusage -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4620) Connection exceptions in JMXTool do not make it to the top level
[ https://issues.apache.org/jira/browse/KAFKA-4620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159392#comment-17159392 ] Kowshik Prakasam commented on KAFKA-4620: - My observation is that it seems this issue is resolved now, as I see the following code: [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/JmxTool.scala#L131-L154] . Perhaps this was fixed as early as in this PR: [https://github.com/apache/kafka/pull/3547 .|https://github.com/apache/kafka/pull/3547] > Connection exceptions in JMXTool do not make it to the top level > > > Key: KAFKA-4620 > URL: https://issues.apache.org/jira/browse/KAFKA-4620 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > > If you run JMXTool before the target process is initialized, the JMX > connection is refused and the tool quits. > Adding the following retry : > {code:java} > while (retries < maxNumRetries && !connected) { > try { > System.err.println("Trying to connect to JMX url: %s".format(url)) > jmxc = JMXConnectorFactory.connect(url, null) > mbsc = jmxc.getMBeanServerConnection() > connected = true > } catch { > case e : Exception => { > System.err.println("Could not connect to JMX url: %s. Exception > %s".format(url, e.getMessage)) > retries += 1 > Thread.sleep(500) > } > } > } > {code} > does not work because the exceptions do not make it to the top level. Running > the above code results in the following output on stderr > {noformat} > Trying to connect to JMX url: > service:jmx:rmi:///jndi/rmi://127.0.0.1:9192/jmxrmi > Jan 11, 2017 8:20:33 PM ClientCommunicatorAdmin restart > WARNING: Failed to restart: java.io.IOException: Failed to get a RMI stub: > javax.naming.ServiceUnavailableException [Root exception is > java.rmi.ConnectException: Connection refused to host: 127.0.0.1; nested > exception is: > java.net.ConnectException: Connection refused] > Jan 11, 2017 8:20:33 PM RMIConnector RMIClientCommunicatorAdmin-doStop > WARNING: Failed to call the method close():java.rmi.ConnectException: > Connection refused to host: 172.31.15.109; nested exception is: > java.net.ConnectException: Connection refused > Jan 11, 2017 8:20:33 PM ClientCommunicatorAdmin Checker-run > WARNING: Failed to check connection: java.net.ConnectException: Connection > refused > Jan 11, 2017 8:20:33 PM ClientCommunicatorAdmin Checker-run > WARNING: stopping > {noformat} > We need to add working retry logic to JMXTool so that it can start correctly > even if the target process is not ready initially. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10028) Implement write path for feature versioning scheme
[ https://issues.apache.org/jira/browse/KAFKA-10028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam reassigned KAFKA-10028: Assignee: Kowshik Prakasam > Implement write path for feature versioning scheme > -- > > Key: KAFKA-10028 > URL: https://issues.apache.org/jira/browse/KAFKA-10028 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > Goal is to implement various classes and integration for the write path of > the feature versioning system > ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]). > This is preceded by the read path implementation (KAFKA-10027). The write > path implementation involves developing the new controller API: > UpdateFeatures that enables transactional application of a set of > cluster-wide feature updates to the ZK {{'/features'}} node, along with > required ACL permissions. > > Details about the write path are explained [in this > part|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP-584:Versioningschemeforfeatures-ChangestoKafkaController] > of the KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10157) Multiple tests failed due to "Failed to process feature ZK node change event"
[ https://issues.apache.org/jira/browse/KAFKA-10157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam resolved KAFKA-10157. -- Resolution: Fixed > Multiple tests failed due to "Failed to process feature ZK node change event" > - > > Key: KAFKA-10157 > URL: https://issues.apache.org/jira/browse/KAFKA-10157 > Project: Kafka > Issue Type: Bug >Reporter: Anna Povzner >Assignee: Kowshik Prakasam >Priority: Major > > Multiple tests failed due to "Failed to process feature ZK node change > event". Looks like a result of merge of this PR: > [https://github.com/apache/kafka/pull/8680] > Note that running tests without `--info` gives output like this one: > {quote}Process 'Gradle Test Executor 36' finished with non-zero exit value 1 > {quote} > kafka.network.DynamicConnectionQuotaTest failed: > {quote} > kafka.network.DynamicConnectionQuotaTest > testDynamicConnectionQuota > STANDARD_OUT > [2020-06-11 20:52:42,596] ERROR [feature-zk-node-event-process-thread]: > Failed to process feature ZK node change event. The broker will eventually > exit. > (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76) > java.lang.InterruptedException > at > java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056) > at > java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090) > at > java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147){quote} > > kafka.api.CustomQuotaCallbackTest failed: > {quote} [2020-06-11 21:07:36,745] ERROR > [feature-zk-node-event-process-thread]: Failed to process feature ZK node > change event. The broker will eventually exit. > (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76) > java.lang.InterruptedException > at > java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056) > at > java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090) > at > java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at scala.util.control.Exception$Catch.apply(Exception.scala:227) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at scala.util.control.Exception$Catch.apply(Exception.scala:227) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > {quote} > > kafka.server.DynamicBrokerReconfigurationTest failed: > {quote} [2020-06-11 21:13:01,207] ERROR > [feature-zk-node-event-process-thread]: Failed to process feature ZK node > change event. The broker will eventually exit. > (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76) > java.lang.InterruptedException > at > java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056) > at > java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090) > at > java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at scala.util.control.Exception$Catch.apply(Exception.scala:227) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > {quote} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10157) Multiple tests failed due to "Failed to process feature ZK node change event"
[ https://issues.apache.org/jira/browse/KAFKA-10157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam reassigned KAFKA-10157: Assignee: Kowshik Prakasam > Multiple tests failed due to "Failed to process feature ZK node change event" > - > > Key: KAFKA-10157 > URL: https://issues.apache.org/jira/browse/KAFKA-10157 > Project: Kafka > Issue Type: Bug >Reporter: Anna Povzner >Assignee: Kowshik Prakasam >Priority: Major > > Multiple tests failed due to "Failed to process feature ZK node change > event". Looks like a result of merge of this PR: > [https://github.com/apache/kafka/pull/8680] > Note that running tests without `--info` gives output like this one: > {quote}Process 'Gradle Test Executor 36' finished with non-zero exit value 1 > {quote} > kafka.network.DynamicConnectionQuotaTest failed: > {quote} > kafka.network.DynamicConnectionQuotaTest > testDynamicConnectionQuota > STANDARD_OUT > [2020-06-11 20:52:42,596] ERROR [feature-zk-node-event-process-thread]: > Failed to process feature ZK node change event. The broker will eventually > exit. > (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76) > java.lang.InterruptedException > at > java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056) > at > java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090) > at > java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147){quote} > > kafka.api.CustomQuotaCallbackTest failed: > {quote} [2020-06-11 21:07:36,745] ERROR > [feature-zk-node-event-process-thread]: Failed to process feature ZK node > change event. The broker will eventually exit. > (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76) > java.lang.InterruptedException > at > java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056) > at > java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090) > at > java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at scala.util.control.Exception$Catch.apply(Exception.scala:227) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at scala.util.control.Exception$Catch.apply(Exception.scala:227) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > {quote} > > kafka.server.DynamicBrokerReconfigurationTest failed: > {quote} [2020-06-11 21:13:01,207] ERROR > [feature-zk-node-event-process-thread]: Failed to process feature ZK node > change event. The broker will eventually exit. > (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76) > java.lang.InterruptedException > at > java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056) > at > java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090) > at > java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at scala.util.control.Exception$Catch.apply(Exception.scala:227) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > {quote} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10027) Implement read path for versioning scheme for features
[ https://issues.apache.org/jira/browse/KAFKA-10027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10027: - Summary: Implement read path for versioning scheme for features (was: KIP-584: Implement read path for versioning scheme for features) > Implement read path for versioning scheme for features > -- > > Key: KAFKA-10027 > URL: https://issues.apache.org/jira/browse/KAFKA-10027 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Major > > Goal is to implement various classes and integration for the read path of the > feature versioning system > ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]). > The ultimate plan is that the cluster-wide *finalized* features information > is going to be stored in ZK under the node {{/feature}}. The read path > implemented in this PR is centered around reading this *finalized* features > information from ZK, and, processing it inside the Broker. > > Here is a summary of what's needed for this Jira (a lot of it is *new* > classes): > * A facility is provided in the broker to declare it's supported features, > and advertise it's supported features via it's own {{BrokerIdZNode}} under a > {{features}} key. > * A facility is provided in the broker to listen to and propagate > cluster-wide *finalized* feature changes from ZK. > * When new *finalized* features are read from ZK, feature incompatibilities > are detected by comparing against the broker's own supported features. > * {{ApiVersionsResponse}} is now served containing supported and finalized > feature information (using the newly added tagged fields). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10027) Implement read path for feature versioning scheme
[ https://issues.apache.org/jira/browse/KAFKA-10027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10027: - Summary: Implement read path for feature versioning scheme (was: Implement read path for versioning scheme for features) > Implement read path for feature versioning scheme > - > > Key: KAFKA-10027 > URL: https://issues.apache.org/jira/browse/KAFKA-10027 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Major > > Goal is to implement various classes and integration for the read path of the > feature versioning system > ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]). > The ultimate plan is that the cluster-wide *finalized* features information > is going to be stored in ZK under the node {{/feature}}. The read path > implemented in this PR is centered around reading this *finalized* features > information from ZK, and, processing it inside the Broker. > > Here is a summary of what's needed for this Jira (a lot of it is *new* > classes): > * A facility is provided in the broker to declare it's supported features, > and advertise it's supported features via it's own {{BrokerIdZNode}} under a > {{features}} key. > * A facility is provided in the broker to listen to and propagate > cluster-wide *finalized* feature changes from ZK. > * When new *finalized* features are read from ZK, feature incompatibilities > are detected by comparing against the broker's own supported features. > * {{ApiVersionsResponse}} is now served containing supported and finalized > feature information (using the newly added tagged fields). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10028) Implement write path for feature versioning scheme
[ https://issues.apache.org/jira/browse/KAFKA-10028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10028: - Summary: Implement write path for feature versioning scheme (was: Implement write path for versioning scheme for features) > Implement write path for feature versioning scheme > -- > > Key: KAFKA-10028 > URL: https://issues.apache.org/jira/browse/KAFKA-10028 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Major > > Goal is to implement various classes and integration for the write path of > the feature versioning system > ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]). > This is preceded by the read path implementation (KAFKA-10027). The write > path implementation involves developing the new controller API: > UpdateFeatures that enables transactional application of a set of > cluster-wide feature updates to the ZK {{'/features'}} node, along with > required ACL permissions. > > Details about the write path are explained [in this > part|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP-584:Versioningschemeforfeatures-ChangestoKafkaController] > of the KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10028) Implement write path for versioning scheme for features
[ https://issues.apache.org/jira/browse/KAFKA-10028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10028: - Summary: Implement write path for versioning scheme for features (was: KIP-584: Implement write path for versioning scheme for features) > Implement write path for versioning scheme for features > --- > > Key: KAFKA-10028 > URL: https://issues.apache.org/jira/browse/KAFKA-10028 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Major > > Goal is to implement various classes and integration for the write path of > the feature versioning system > ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]). > This is preceded by the read path implementation (KAFKA-10027). The write > path implementation involves developing the new controller API: > UpdateFeatures that enables transactional application of a set of > cluster-wide feature updates to the ZK {{'/features'}} node, along with > required ACL permissions. > > Details about the write path are explained [in this > part|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP-584:Versioningschemeforfeatures-ChangestoKafkaController] > of the KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10028) KIP-584: Implement write path for versioning scheme for features
Kowshik Prakasam created KAFKA-10028: Summary: KIP-584: Implement write path for versioning scheme for features Key: KAFKA-10028 URL: https://issues.apache.org/jira/browse/KAFKA-10028 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam Goal is to implement various classes and integration for the write path of the feature versioning system ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]). This is preceded by the read path implementation (KAFKA-10027). The write path implementation involves developing the new controller API: UpdateFeatures that enables transactional application of a set of cluster-wide feature updates to the ZK {{'/features'}} node, along with required ACL permissions. Details about the write path are explained [in this part|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP-584:Versioningschemeforfeatures-ChangestoKafkaController] of the KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10026) KIP-584: Implement read path for versioning scheme for features
[ https://issues.apache.org/jira/browse/KAFKA-10026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam resolved KAFKA-10026. -- Resolution: Duplicate Duplicate of KAFKA-10027 > KIP-584: Implement read path for versioning scheme for features > --- > > Key: KAFKA-10026 > URL: https://issues.apache.org/jira/browse/KAFKA-10026 > Project: Kafka > Issue Type: New Feature >Reporter: Kowshik Prakasam >Priority: Major > > Goal is to implement various classes and integration for the read path of the > feature versioning system > ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]). > The ultimate plan is that the cluster-wide *finalized* features information > is going to be stored in ZK under the node {{/feature}}. The read path > implemented in this PR is centered around reading this *finalized* features > information from ZK, and, processing it inside the Broker. > > Here is a summary of what's needed for this Jira (a lot of it is *new* > classes): > * A facility is provided in the broker to declare it's supported features, > and advertise it's supported features via it's own {{BrokerIdZNode}} under a > {{features}} key. > * A facility is provided in the broker to listen to and propagate > cluster-wide *finalized* feature changes from ZK. > * When new *finalized* features are read from ZK, feature incompatibilities > are detected by comparing against the broker's own supported features. > * {{ApiVersionsResponse}} is now served containing supported and finalized > feature information (using the newly added tagged fields). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10027) KIP-584: Implement read path for versioning scheme for features
Kowshik Prakasam created KAFKA-10027: Summary: KIP-584: Implement read path for versioning scheme for features Key: KAFKA-10027 URL: https://issues.apache.org/jira/browse/KAFKA-10027 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam Goal is to implement various classes and integration for the read path of the feature versioning system ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]). The ultimate plan is that the cluster-wide *finalized* features information is going to be stored in ZK under the node {{/feature}}. The read path implemented in this PR is centered around reading this *finalized* features information from ZK, and, processing it inside the Broker. Here is a summary of what's needed for this Jira (a lot of it is *new* classes): * A facility is provided in the broker to declare it's supported features, and advertise it's supported features via it's own {{BrokerIdZNode}} under a {{features}} key. * A facility is provided in the broker to listen to and propagate cluster-wide *finalized* feature changes from ZK. * When new *finalized* features are read from ZK, feature incompatibilities are detected by comparing against the broker's own supported features. * {{ApiVersionsResponse}} is now served containing supported and finalized feature information (using the newly added tagged fields). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10026) KIP-584: Implement read path for versioning scheme for features
Kowshik Prakasam created KAFKA-10026: Summary: KIP-584: Implement read path for versioning scheme for features Key: KAFKA-10026 URL: https://issues.apache.org/jira/browse/KAFKA-10026 Project: Kafka Issue Type: New Feature Reporter: Kowshik Prakasam Goal is to implement various classes and integration for the read path of the feature versioning system ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]). The ultimate plan is that the cluster-wide *finalized* features information is going to be stored in ZK under the node {{/feature}}. The read path implemented in this PR is centered around reading this *finalized* features information from ZK, and, processing it inside the Broker. Here is a summary of what's needed for this Jira (a lot of it is *new* classes): * A facility is provided in the broker to declare it's supported features, and advertise it's supported features via it's own {{BrokerIdZNode}} under a {{features}} key. * A facility is provided in the broker to listen to and propagate cluster-wide *finalized* feature changes from ZK. * When new *finalized* features are read from ZK, feature incompatibilities are detected by comparing against the broker's own supported features. * {{ApiVersionsResponse}} is now served containing supported and finalized feature information (using the newly added tagged fields). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9755) Implement versioning scheme for features
[ https://issues.apache.org/jira/browse/KAFKA-9755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-9755: Description: Details are in this wiki: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features] . This is the master Jira for tracking the implementation of versioning scheme for features to facilitate client discovery and feature gating (as explained in the above wiki). was: Details are in this wiki: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features] . This Jira is for tracking the implementation of versioning scheme for features to facilitate client discovery and feature gating (as explained in the above wiki). > Implement versioning scheme for features > > > Key: KAFKA-9755 > URL: https://issues.apache.org/jira/browse/KAFKA-9755 > Project: Kafka > Issue Type: Improvement > Components: controller, core, protocol, streams >Reporter: Kowshik Prakasam >Priority: Major > > Details are in this wiki: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features] > . > This is the master Jira for tracking the implementation of versioning scheme > for features to facilitate client discovery and feature gating (as explained > in the above wiki). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9755) Implement versioning scheme for features
Kowshik Prakasam created KAFKA-9755: --- Summary: Implement versioning scheme for features Key: KAFKA-9755 URL: https://issues.apache.org/jira/browse/KAFKA-9755 Project: Kafka Issue Type: Improvement Components: controller, core, protocol, streams Reporter: Kowshik Prakasam Details are in this wiki: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features] . This Jira is for tracking the implementation of versioning scheme for features to facilitate client discovery and feature gating (as explained in the above wiki). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9715) TransactionStateManager: Eliminate unused reference to interBrokerProtocolVersion
[ https://issues.apache.org/jira/browse/KAFKA-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-9715: Description: In TransactionStateManager, the attribute interBrokerProtocolVersion is unused within the class. It can therefore be eliminated from the code. [https://github.com/apache/kafka/blob/07db26c20fcbccbf758591607864f7fd4bd8975f/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L78] was: In TransactionStateManager, the attribute interBrokerProtocolVersion is unused. It can therefore be eliminated from the code. [https://github.com/apache/kafka/blob/07db26c20fcbccbf758591607864f7fd4bd8975f/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L78] > TransactionStateManager: Eliminate unused reference to > interBrokerProtocolVersion > - > > Key: KAFKA-9715 > URL: https://issues.apache.org/jira/browse/KAFKA-9715 > Project: Kafka > Issue Type: Improvement >Reporter: Kowshik Prakasam >Priority: Minor > > In TransactionStateManager, the attribute interBrokerProtocolVersion is > unused within the class. It can therefore be eliminated from the code. > [https://github.com/apache/kafka/blob/07db26c20fcbccbf758591607864f7fd4bd8975f/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L78] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9715) TransactionStateManager: Eliminate unused reference to interBrokerProtocolVersion
[ https://issues.apache.org/jira/browse/KAFKA-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-9715: Description: In TransactionStateManager, the attribute interBrokerProtocolVersion is unused within the class. It can therefore be eliminated from the code. Please refer to this LOC: [https://github.com/apache/kafka/blob/07db26c20fcbccbf758591607864f7fd4bd8975f/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L78] was: In TransactionStateManager, the attribute interBrokerProtocolVersion is unused within the class. It can therefore be eliminated from the code. [https://github.com/apache/kafka/blob/07db26c20fcbccbf758591607864f7fd4bd8975f/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L78] > TransactionStateManager: Eliminate unused reference to > interBrokerProtocolVersion > - > > Key: KAFKA-9715 > URL: https://issues.apache.org/jira/browse/KAFKA-9715 > Project: Kafka > Issue Type: Improvement >Reporter: Kowshik Prakasam >Priority: Minor > > In TransactionStateManager, the attribute interBrokerProtocolVersion is > unused within the class. It can therefore be eliminated from the code. Please > refer to this LOC: > [https://github.com/apache/kafka/blob/07db26c20fcbccbf758591607864f7fd4bd8975f/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L78] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9715) TransactionStateManager: Eliminate unused reference to interBrokerProtocolVersion
Kowshik Prakasam created KAFKA-9715: --- Summary: TransactionStateManager: Eliminate unused reference to interBrokerProtocolVersion Key: KAFKA-9715 URL: https://issues.apache.org/jira/browse/KAFKA-9715 Project: Kafka Issue Type: Improvement Reporter: Kowshik Prakasam In TransactionStateManager, the attribute interBrokerProtocolVersion is unused. It can therefore be eliminated from the code. [https://github.com/apache/kafka/blob/07db26c20fcbccbf758591607864f7fd4bd8975f/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L78] -- This message was sent by Atlassian Jira (v8.3.4#803005)