Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440153281 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); } } +// for testing +public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { Review Comment: @kamalcph Yes sure. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
kamalcph commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440146317 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); } } +// for testing +public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { Review Comment: @iit2009060 can we remove the `public` method access specifier? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1874937679 > LGTM, thanks for the patch! > > This patch handles the FETCH request for previously compacted topic segments uploaded to remote storage. We also have to go through the > > 1. `upload`, `deletion` path and > 2. RemoteLogMetadataCache internal state > > for both normal and unclean-leader-election scenarios. https://issues.apache.org/jira/browse/KAFKA-15388 As mentioned in the description for upload and delete it will not be impacted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440133601 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440126550 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2065,6 +2066,11 @@ public Optional fetchRemoteLogSegmentMetadata(TopicPar return Optional.of(segmentMetadata); } +public Optional findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, + Option leaderEpochFileCacheOption) { +return Optional.ofNullable(null); Review Comment: done. ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2161,6 +2167,85 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse } } +@Test +public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException, IOException { +FileInputStream fileInputStream = mock(FileInputStream.class); +RemoteLogInputStream remoteLogInputStream = mock(RemoteLogInputStream.class); +ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); +RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); +LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); +when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440116425 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); } } +// for testing +public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { Review Comment: @kamalcph Do we really see a need for passing actual ByteArrayInputStream, as it will make it difficult to mock the behaviour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440115636 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); } } +// for testing +public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { Review Comment: @kamalcph I want to mock the RemoteLoginputStream so that i can invoke and mockthe nextBatch function accordingly. In the first iteration I want the firstBatch to be null and in the next iteration firstBatch to be available. ` when(remoteLogInputStream.nextBatch()).thenReturn(null, firstBatch);` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440115636 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); } } +// for testing +public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { Review Comment: @kamalcph I want to mock the RemoteLoginputStream so that i can invoke the nextBatch function. In the first iteration I want the firstBatch to be null and in the next iteration firstBatch to be available. ` when(remoteLogInputStream.nextBatch()).thenReturn(null, firstBatch);` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802024#comment-17802024 ] Satish Duggana commented on KAFKA-16073: We discussed one possible solution is to address it by updating local-log-start-offset before the segments are removed from inmemory and scheduled for deletion but we need to think through the end to end scenarios. cc [~Kamal C] > Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed > localLogStartOffset Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.1 > > > The identified bug in Apache Kafka's tiered storage feature involves a > delayed update of {{localLogStartOffset}} in the > {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. > When segments are deleted from the log's memory state, the > {{localLogStartOffset}} isn't promptly updated. Concurrently, > {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch > offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka > erroneously sends an {{OffsetOutOfRangeException}} to the consumer. > In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < > offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a > background deletion process removes segments from memory, it hasn't yet > updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. > Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against > the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, > it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue > arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to > incorrect handling of consumer fetch requests and potential data access > errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
kamalcph commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440097317 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: Add one comment above the `while` loop to mention the case that we handled for posterity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
kamalcph commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440089289 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); } } +// for testing +public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { Review Comment: can we reduce the method access specifier to package-private? I would suggest to remove this method and return the actual stream instead of mockStream for `remoteStorageManager.fetchLogSegment`: ```java new ByteArrayInputStream(records(ts, startOffset, targetLeaderEpoch).buffer().array()) ``` ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2161,6 +2167,85 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse } } +@Test +public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException, IOException { +FileInputStream fileInputStream = mock(FileInputStream.class); +RemoteLogInputStream remoteLogInputStream = mock(RemoteLogInputStream.class); +ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); +RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); +LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); +when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) Review Comment: There are two remote-storage-manager: `rsmManager` (local) and `remoteStorageManager` (global). Can we discard the latter one? ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2065,6 +2066,11 @@ public Optional fetchRemoteLogSegmentMetadata(TopicPar return Optional.of(segmentMetadata); } +public Optional findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, + Option leaderEpochFileCacheOption) { +return Optional.ofNullable(null); Review Comment: Can we use `Optional.empty()` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15742: KRaft support in GroupCoordinatorIntegrationTest [kafka]
wernerdv commented on code in PR #15086: URL: https://github.com/apache/kafka/pull/15086#discussion_r1440092493 ## core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala: ## @@ -33,18 +34,21 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness { overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.id.toString) - override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { + override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnectOrNull, enableControlledShutdown = false).map { KafkaConfig.fromProps(_, overridingProps) } - @Test - def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(quorum: String): Unit = { val consumer = TestUtils.createConsumer(bootstrapServers()) val offsetMap = Map( new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "") ).asJava consumer.commitSync(offsetMap) -val logManager = servers.head.getLogManager +val logManager = + if (isKRaftTest()) brokers.head.logManager Review Comment: yep, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16070) Extract the setReadOnly method into Headers
[ https://issues.apache.org/jira/browse/KAFKA-16070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] johndoe reassigned KAFKA-16070: --- Assignee: johndoe > Extract the setReadOnly method into Headers > --- > > Key: KAFKA-16070 > URL: https://issues.apache.org/jira/browse/KAFKA-16070 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Lighting Sui >Assignee: johndoe >Priority: Major > > Abstract the setReadOnly function into the Headers Interface and provide a > default implementation. > The setReadOnly function in Headers can be rewritten by RecordHeaders, so > that the setReadOnly function in KafkaProducer can be removed and the code > can be cleaned up. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phuc Hong Tran reassigned KAFKA-15561: -- Assignee: Phuc Hong Tran > Client support for new SubscriptionPattern based subscription > -- > > Key: KAFKA-15561 > URL: https://issues.apache.org/jira/browse/KAFKA-15561 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > New consumer should support subscribe with the new SubscriptionPattern > introduced in the new consumer group protocol. When subscribing with this > regex, the client should provide the regex in the HB request on the > SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments
[ https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phuc Hong Tran reassigned KAFKA-15682: -- Assignee: Phuc Hong Tran > Ensure internal remote log metadata topic does not expire its segments before > deleting user-topic segments > -- > > Key: KAFKA-15682 > URL: https://issues.apache.org/jira/browse/KAFKA-15682 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Phuc Hong Tran >Priority: Major > > One of the implementation of RemoteLogMetadataManager is > TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic > {{__remote_log_metadata}} to store the metadata about the remote log > segments. Unlike other internal topics which are compaction enabled, this > topic is not enabled with compaction and retention is set to unlimited. > Keeping this internal topic retention to unlimited is not practical in real > world use-case where the topic local disk usage footprint grow huge over a > period of time. > It is assumed that the user will set the retention to a reasonable time such > that it is the max of all the user-created topics (max + X). We can't just > rely on the assumption and need an assertion to ensure that the internal > {{__remote_log_metadata}} segments are not eligible for deletion before the > expiry of all the relevant user-topic uploaded remote-log-segments , > otherwise there will be dangling remote-log-segments which won't be cleared > once all the brokers are restarted post the internal topic retention cleanup. > See the discussion thread: > https://github.com/apache/kafka/pull/14576#discussion_r1368576126 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id
[ https://issues.apache.org/jira/browse/KAFKA-16034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phuc Hong Tran resolved KAFKA-16034. Resolution: Fixed > AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on > fenced/unknown member Id > --- > > Key: KAFKA-16034 > URL: https://issues.apache.org/jira/browse/KAFKA-16034 > Project: Kafka > Issue Type: Sub-task >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Fix For: 3.8.0 > > > The consumer will log invalid request error when joining from fenced/unknown > member id because we didn't reset the HeartbeatState and we won't send the > needed fields (rebalanceTimeoutMs for example) when joining. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id
[ https://issues.apache.org/jira/browse/KAFKA-16034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802013#comment-17802013 ] Phuc Hong Tran commented on KAFKA-16034: [~pnee] PlainTextConsumerTest isn't failing with any fenced/unknown_memeber_id exception. We can close this one then. > AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on > fenced/unknown member Id > --- > > Key: KAFKA-16034 > URL: https://issues.apache.org/jira/browse/KAFKA-16034 > Project: Kafka > Issue Type: Sub-task >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Fix For: 3.8.0 > > > The consumer will log invalid request error when joining from fenced/unknown > member id because we didn't reset the HeartbeatState and we won't send the > needed fields (rebalanceTimeoutMs for example) when joining. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
ashwinpankaj commented on code in PR #15101: URL: https://github.com/apache/kafka/pull/15101#discussion_r1440041522 ## core/src/test/java/kafka/test/junit/LeakTestingExtension.java: ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.junit; + +import kafka.utils.TestUtils; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class LeakTestingExtension implements AfterEachCallback { +@Override +public void afterEach(ExtensionContext extensionContext) { +TestUtils.verifyNoUnexpectedThreads("@AfterEach"); Review Comment: Thanks @wernerdv . I had a basic question about this change - [TestUtils.verifyNoUnexpectedThreads](https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2432) seems to check for the presence of specific type of threads. Will this check be sufficient to catch all types of thread leaks ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
ashwinpankaj commented on code in PR #15101: URL: https://github.com/apache/kafka/pull/15101#discussion_r1440041522 ## core/src/test/java/kafka/test/junit/LeakTestingExtension.java: ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.junit; + +import kafka.utils.TestUtils; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class LeakTestingExtension implements AfterEachCallback { +@Override +public void afterEach(ExtensionContext extensionContext) { +TestUtils.verifyNoUnexpectedThreads("@AfterEach"); Review Comment: Thanks @wernerdv . I had a basic question about this change - (TestUtils.verifyNoUnexpectedThreads)[https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2432] seems to check for the presence of specific type of threads. Will this check be sufficient to catch all types of thread leaks ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] remove meaningless lines [kafka]
github-actions[bot] commented on PR #14423: URL: https://github.com/apache/kafka/pull/14423#issuecomment-1874810052 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802000#comment-17802000 ] hzh0425 commented on KAFKA-16073: - ping [~satish.duggana], When you are free, please help to see if there are any other solutions > Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed > localLogStartOffset Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.1 > > > The identified bug in Apache Kafka's tiered storage feature involves a > delayed update of {{localLogStartOffset}} in the > {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. > When segments are deleted from the log's memory state, the > {{localLogStartOffset}} isn't promptly updated. Concurrently, > {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch > offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka > erroneously sends an {{OffsetOutOfRangeException}} to the consumer. > In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < > offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a > background deletion process removes segments from memory, it hasn't yet > updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. > Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against > the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, > it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue > arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to > incorrect handling of consumer fetch requests and potential data access > errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14510) Extend DescribeConfigs API to support group configs
[ https://issues.apache.org/jira/browse/KAFKA-14510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jimmy Wang reassigned KAFKA-14510: -- Assignee: Jimmy Wang > Extend DescribeConfigs API to support group configs > --- > > Key: KAFKA-14510 > URL: https://issues.apache.org/jira/browse/KAFKA-14510 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Jimmy Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fix flaky test RemoteIndexCacheTest.testClose() [kafka]
jolshan commented on PR #15108: URL: https://github.com/apache/kafka/pull/15108#issuecomment-1874744763 Do we know why sometimes we get interrupted? I think this is a useful change though, when I zoomed out for a month the failures were more frequent. https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=America%2FLos_Angeles=kafka.log.remote.RemoteIndexCacheTest=testClose() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15742: KRaft support in GroupCoordinatorIntegrationTest [kafka]
jolshan commented on code in PR #15086: URL: https://github.com/apache/kafka/pull/15086#discussion_r1439990671 ## core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala: ## @@ -33,18 +34,21 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness { overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.id.toString) - override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { + override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnectOrNull, enableControlledShutdown = false).map { KafkaConfig.fromProps(_, overridingProps) } - @Test - def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(quorum: String): Unit = { val consumer = TestUtils.createConsumer(bootstrapServers()) val offsetMap = Map( new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "") ).asJava consumer.commitSync(offsetMap) -val logManager = servers.head.getLogManager +val logManager = + if (isKRaftTest()) brokers.head.logManager Review Comment: Can we use brokers.head.logManager for Zk tests too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15742: KRaft support in GroupCoordinatorIntegrationTest [kafka]
jolshan commented on PR #15086: URL: https://github.com/apache/kafka/pull/15086#issuecomment-1874741304 `testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(String).quorum=kraft PASSED` Looks like it is passing on all builds. 拾 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Increase parallelism for Jenkins [kafka]
jolshan commented on PR #15099: URL: https://github.com/apache/kafka/pull/15099#issuecomment-1874738065 @divijvaidya did you accidentally include the KafkaApisTest change here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]
jolshan commented on PR #14599: URL: https://github.com/apache/kafka/pull/14599#issuecomment-1874737282 Thanks @MikeEdgar looks like the test is fixed. I'll run the build again to make sure there aren't any other related tests failing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16046: also fix stores for outer join [kafka]
ableegoldman merged PR #15073: URL: https://github.com/apache/kafka/pull/15073 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16046: also fix stores for outer join [kafka]
ableegoldman commented on PR #15073: URL: https://github.com/apache/kafka/pull/15073#issuecomment-1874668715 Test failures are unrelated, merging to trunk and will cherrypick to 3.7 cc @stanislavkozlovski -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16076: Interrupting the currentThread fix [kafka]
metao1 opened a new pull request, #15110: URL: https://github.com/apache/kafka/pull/15110 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16076) RestClient Interrupting the thread in case of InterruptedException
[ https://issues.apache.org/jira/browse/KAFKA-16076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mehrdad Karami updated KAFKA-16076: --- Description: In RestClient class, httpRequest is being called with different threads. An InterruptedException in case of failure is used to handle its specific exceptions, nevertheless it's forgot to call Thread.currentThread().interrupt(). In general, it's a good practice to call this so the rest of code know the thread was interrupted already. Note: Some methods that cause a thread to wait or sleep (like {{{}Thread.sleep(){}}}) will check this flag. If they see it’s set, they’ll stop waiting/sleeping and throw an {{InterruptedException.}} was: In RestClient class, `httpRequest` is being called with different threads. An `InterruptedException` in case of failure is used to handle its specific exceptions, nevertheless it's forgot to call `Thread.currentThread().interrupt()`. In general, it's a good practice to call this so the rest of code know the thread was interrupted already. Note: Some methods that cause a thread to wait or sleep (like `{{{}Thread.sleep()`{}}}) will check this flag. If they see it’s set, they’ll stop waiting/sleeping and throw an `{{{}InterruptedException`{}}} > RestClient Interrupting the thread in case of InterruptedException > -- > > Key: KAFKA-16076 > URL: https://issues.apache.org/jira/browse/KAFKA-16076 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Mehrdad Karami >Priority: Minor > Labels: easyfix > > In RestClient class, httpRequest is being called with different threads. An > InterruptedException in case of failure is used to handle its specific > exceptions, nevertheless it's forgot to call > Thread.currentThread().interrupt(). > In general, it's a good practice to call this so the rest of code know the > thread was interrupted already. > Note: > Some methods that cause a thread to wait or sleep (like > {{{}Thread.sleep(){}}}) will check this flag. If they see it’s set, they’ll > stop waiting/sleeping and throw an {{InterruptedException.}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16076) RestClient Interrupting the thread in case of InterruptedException
Mehrdad Karami created KAFKA-16076: -- Summary: RestClient Interrupting the thread in case of InterruptedException Key: KAFKA-16076 URL: https://issues.apache.org/jira/browse/KAFKA-16076 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Mehrdad Karami In RestClient class, `httpRequest` is being called with different threads. An `InterruptedException` in case of failure is used to handle its specific exceptions, nevertheless it's forgot to call `Thread.currentThread().interrupt()`. In general, it's a good practice to call this so the rest of code know the thread was interrupted already. Note: Some methods that cause a thread to wait or sleep (like `{{{}Thread.sleep()`{}}}) will check this flag. If they see it’s set, they’ll stop waiting/sleeping and throw an `{{{}InterruptedException`{}}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16025: Fix orphaned locks when rebalancing and store cleanup race on unassigned task directories [kafka]
ableegoldman commented on code in PR #15088: URL: https://github.com/apache/kafka/pull/15088#discussion_r1439872897 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1229,10 +1229,21 @@ private void tryToLockAllNonEmptyTaskDirectories() { final String namedTopology = taskDir.namedTopology(); try { final TaskId id = parseTaskDirectoryName(dir.getName(), namedTopology); -if (stateDirectory.lock(id)) { -lockedTaskDirectories.add(id); -if (!allTasks.containsKey(id)) { -log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id); +boolean lockedEmptyDirectory = false; +try { +if (stateDirectory.lock(id)) { +if (stateDirectory.directoryForTaskIsEmpty(id)) { +lockedEmptyDirectory = true; Review Comment: This is maybe a bit pedantic, but I feel like it would be easier to understand the flow & intent of this code if we just unlocked it directly right here (instead of introducing the `lockedEmptyDirectory` flag and then unlocking it later on) Also, since we log a debug message when we do lock the directory, we might as well put a similar debug message in this branch too. Something like `"Released lock on empty directory for task {}"` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801927#comment-17801927 ] A. Sophie Blee-Goldman commented on KAFKA-16025: Nice find and writeup of the race condition – I'll take a look at the fix > Streams StateDirectory has orphaned locks after rebalancing, blocking future > rebalancing > > > Key: KAFKA-16025 > URL: https://issues.apache.org/jira/browse/KAFKA-16025 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 > Environment: Linux >Reporter: Sabit >Priority: Major > > Hello, > > We are encountering an issue where during rebalancing, we see streams threads > on one client get stuck in rebalancing. Upon enabling debug logs, we saw that > some tasks were having issues initializing due to failure to grab a lock in > the StateDirectory: > > {{2023-12-14 22:51:57.352000Z stream-thread > [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: > stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] > Failed to lock the state directory for task 0_51; will retry}} > > We were able to reproduce this behavior reliably on 3.4.0. This is the > sequence that triggers the bug. > Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), > each with 5 threads (1-5), and the consumer is using stateful tasks which > have state stores on disk. There are 10 active tasks and 10 standby tasks. > # Instance A is deactivated > # As an example, lets say task 0_1, previously on instance B, moves to > instance C > # Task 0_1 leaves behind it's state directory on Instance B's disk, > currently unused, and no lock for it exists in Instance B's StateDirectory > in-memory lock tracker > # Instance A is re-activated > # Streams thread 1 on Instance B is asked to re-join the consumer group due > to a new member being added > # As part of re-joining, thread 1 lists non-empty state directories in order > to report the offset's it has in it's state stores as part of it's metadata. > Thread 1 sees that the directory for 0_1 is not empty. > # The cleanup thread on instance B runs. The cleanup thread locks state > store 0_1, sees the directory for 0_1 was last modified more than > `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully > # Thread 1 takes a lock on directory 0_1 due to it being found not-empty > before, unaware that the cleanup has run between the time of the check and > the lock. It tracks this lock in it's own in-memory store, in addition to > StateDirectory's in-memory lock store > # Thread 1 successfully joins the consumer group > # After every consumer in the group joins the group, assignments are > calculated, and then every consumer calls sync group to receive the new > assignments > # Thread 1 on Instance B calls sync group but gets an error - the group > coordinator has triggered a new rebalance and all members must rejoin the > group > # Thread 1 again lists non-empty state directories in order to report the > offset's it has in it's state stores as part of it's metadata. Prior to doing > so, it clears it's in-memory store tracking the locks it has taken for the > purpose of gathering rebalance metadata > # Thread 1 no longer takes a lock on 0_1 as it is empty > # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory > # All consumers re-join and sync successfully, receiving their new > assignments > # Thread 2 on Instance B is assigned task 0_1 > # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is > still being held by Thread 1 > # Thread 2 remains in rebalancing state, and cannot make progress on task > 0_1, or any other tasks it has assigned. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Prototype: Measure CI slowness [kafka]
gharris1727 closed pull request #15008: MINOR: Prototype: Measure CI slowness URL: https://github.com/apache/kafka/pull/15008 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
[ https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801924#comment-17801924 ] A. Sophie Blee-Goldman commented on KAFKA-16055: See this discussion on the user mailing list for additional context: [https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol] > Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders > > > Key: KAFKA-16055 > URL: https://issues.apache.org/jira/browse/KAFKA-16055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Kohei Nozaki >Priority: Minor > Labels: newbie, newbie++ > > This was originally raised in [a kafka-users > post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]. > There is a HashMap stored in QueryableStoreProvider#storeProviders ([code > link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39]) > which can be mutated by a KafkaStreams#removeStreamThread() call. This can > be problematic when KafkaStreams#store is called from a separate thread. > We need to somehow make this part of code thread-safe by replacing it by > ConcurrentHashMap or/and using an existing locking mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
[ https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16055: --- Labels: newbie newbie++ (was: ) > Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders > > > Key: KAFKA-16055 > URL: https://issues.apache.org/jira/browse/KAFKA-16055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Kohei Nozaki >Priority: Minor > Labels: newbie, newbie++ > > This was originally raised in [a kafka-users > post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]. > There is a HashMap stored in QueryableStoreProvider#storeProviders ([code > link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39]) > which can be mutated by a KafkaStreams#removeStreamThread() call. This can > be problematic when KafkaStreams#store is called from a separate thread. > We need to somehow make this part of code thread-safe by replacing it by > ConcurrentHashMap or/and using an existing locking mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16075) TLS configuration not validated in KRaft controller-only nodes
Jakub Scholz created KAFKA-16075: Summary: TLS configuration not validated in KRaft controller-only nodes Key: KAFKA-16075 URL: https://issues.apache.org/jira/browse/KAFKA-16075 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.6.1 Reporter: Jakub Scholz When the Kafka broker node (either a broker in ZooKeeper based cluster or node with a broker role in a KRaft cluster) has an incorrect TLS configuration such as unsupported TLS cipher suite, it seems to throw a {{ConfigException}} and shutdown: {code:java} 2024-01-02 13:50:24,895 ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$) [main] org.apache.kafka.common.config.ConfigException: Invalid value java.lang.IllegalArgumentException: Unsupported CipherSuite: TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305 for configuration A client SSLEngine created with the provided settings can't connect to a server SSLEngine created with those settings. at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:102) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:73) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107) at kafka.network.Processor.(SocketServer.scala:973) at kafka.network.Acceptor.newProcessor(SocketServer.scala:879) at kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:849) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) at kafka.network.Acceptor.addProcessors(SocketServer.scala:848) at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:523) at kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:251) at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175) at kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) at scala.collection.AbstractIterable.foreach(Iterable.scala:933) at kafka.network.SocketServer.(SocketServer.scala:175) at kafka.server.BrokerServer.startup(BrokerServer.scala:242) at kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:96) at kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:96) at scala.Option.foreach(Option.scala:437) at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:96) at kafka.Kafka$.main(Kafka.scala:113) at kafka.Kafka.main(Kafka.scala) {code} But in a KRaft controller-only nodes, such validation does not seem to happen and the broker keeps running and looping with this warning: {code:java} 2024-01-02 13:53:10,186 WARN [RaftManager id=1] Error connecting to node my-cluster-controllers-0.my-cluster-kafka-brokers.myproject.svc.cluster.local:9090 (id: 0 rack: null) (org.apache.kafka.clients.NetworkClient) [kafka-1-raft-outbound-request-thread] java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed] at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:348) at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) at org.apache.kafka.common.network.Selector.connect(Selector.java:256) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1032) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) at org.apache.kafka.server.util.InterBrokerSendThread.sendRequests(InterBrokerSendThread.java:145) at org.apache.kafka.server.util.InterBrokerSendThread.pollOnce(InterBrokerSendThread.java:108) at org.apache.kafka.server.util.InterBrokerSendThread.doWork(InterBrokerSendThread.java:136) at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130) Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Unsupported CipherSuite: TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305 at org.apache.kafka.common.network.SslChannelBuilder.buildChannel(SslChannelBuilder.java:111) at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338) ... 8 more Caused by: java.lang.IllegalArgumentException: Unsupported CipherSuite: TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305 at java.base/sun.security.ssl.CipherSuite.validValuesOf(CipherSuite.java:978) at java.base/sun.security.ssl.SSLEngineImpl.setEnabledCipherSuites(SSLEngineImpl.java:864) at
[jira] [Commented] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id
[ https://issues.apache.org/jira/browse/KAFKA-16034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801921#comment-17801921 ] Philip Nee commented on KAFKA-16034: Hi [~phuctran] - I think the issue might already been fixed. Please try to reproduce it using PlainTextConsumerTest. > AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on > fenced/unknown member Id > --- > > Key: KAFKA-16034 > URL: https://issues.apache.org/jira/browse/KAFKA-16034 > Project: Kafka > Issue Type: Sub-task >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Fix For: 3.8.0 > > > The consumer will log invalid request error when joining from fenced/unknown > member id because we didn't reset the HeartbeatState and we won't send the > needed fields (rebalanceTimeoutMs for example) when joining. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]
philipnee commented on code in PR #15020: URL: https://github.com/apache/kafka/pull/15020#discussion_r1439651419 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ## @@ -70,8 +70,10 @@ protected void maybeThrowAuthFailure(Node node) { */ @Override public PollResult poll(long currentTimeMs) { +boolean checkNodeAvailability = false; + return pollInternal( -prepareFetchRequests(), +prepareFetchRequests(checkNodeAvailability), Review Comment: just pass false. the var above is useless. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -403,7 +405,7 @@ protected Map prepareCloseFetchSessi * Create fetch requests for all nodes for which we have assigned partitions * that have no existing requests in flight. */ -protected Map prepareFetchRequests() { +protected Map prepareFetchRequests(boolean checkNodeAvailability) { Review Comment: `final boolean` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -372,7 +372,7 @@ Node selectReadReplica(final TopicPartition partition, final Node leaderReplica, } } -protected Map prepareCloseFetchSessionRequests() { +protected Map prepareCloseFetchSessionRequests(boolean checkNodeAvailability) { Review Comment: `final boolean checkNodeAvailability` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -428,37 +430,53 @@ protected Map prepareFetchRequests() // Use the preferred read replica if set, otherwise the partition's leader Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs); -if (isUnavailable(node)) { -maybeThrowAuthFailure(node); - -// If we try to send during the reconnect backoff window, then the request is just -// going to be failed anyway before being sent, so skip sending the request for now -log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node); -} else if (nodesWithPendingFetchRequests.contains(node.id())) { -log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node); +if (checkNodeAvailability) { Review Comment: is it possible to avoid nested if? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -102,7 +102,9 @@ public void clearBufferedDataForUnassignedPartitions(Collection * @return number of fetches sent */ public synchronized int sendFetches() { -final Map fetchRequests = prepareFetchRequests(); +boolean checkNodeAvailability = true; Review Comment: same as above ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -385,7 +385,9 @@ protected Map prepareCloseFetchSessi // skip sending the close request. final Node fetchTarget = cluster.nodeById(fetchTargetNodeId); -if (fetchTarget == null || isUnavailable(fetchTarget)) { +boolean fetchTargetAvailability = checkNodeAvailability ? (fetchTarget == null || isUnavailable(fetchTarget)) : fetchTarget == null; Review Comment: `final boolean` - i would also call this `isFetchTargetAvailable` - let's not use inline if. this makes the code harder to read. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ## @@ -82,9 +84,14 @@ public PollResult poll(long currentTimeMs) { */ @Override public PollResult pollOnClose() { + +boolean checkNodeAvailability = false; + + // TODO: move the logic to poll to handle signal close + return pollInternal( -prepareCloseFetchSessionRequests(), +prepareCloseFetchSessionRequests(checkNodeAvailability), Review Comment: ditto ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -781,10 +781,10 @@ public void testFetchSkipsBlackedOutNodes() { Node node = initialUpdateResponse.brokers().iterator().next(); client.backoff(node, 500); -assertEquals(0, sendFetches()); +assertEquals(1, sendFetches()); Review Comment: why are you changing the tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at:
Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]
ijuma commented on PR #15109: URL: https://github.com/apache/kafka/pull/15109#issuecomment-1874508632 I guess one way would be to enable the cache only for CI & trunk by default at first. Then we could manually enable the cache locally and for one PR to test the behavior. If it all looks good, we would then enable the cache for everyone (but leave pushes for trunk && CI only). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]
ijuma commented on PR #15109: URL: https://github.com/apache/kafka/pull/15109#issuecomment-1874507122 Could we do a test with a new branch (not trunk) where we validate the behavior before we roll it out to everyone else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]
nicktelford commented on PR #15109: URL: https://github.com/apache/kafka/pull/15109#issuecomment-1874500874 Hi @divijvaidya , this enables the _remote_ build cache, which allows different machines to share the cached output of tasks. Thanks for the links to the builds, I haven't had a chance to look at them yet, but the thing that requires access to ge would be too simulate a trunk CI run and push to the cache, which I won't be able to do locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16063: Prevent memory leak by Disabling shutdownhook [kafka]
divijvaidya merged PR #15104: URL: https://github.com/apache/kafka/pull/15104 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16063: Prevent memory leak by Disabling shutdownhook [kafka]
divijvaidya commented on PR #15104: URL: https://github.com/apache/kafka/pull/15104#issuecomment-1874493514 The test failures not in classes that use MiniKdc which is changed in this PR. The only test that uses kdc and fails is `SaslScramSslEndToEndAuthorizationTest` but it is successful in earlier runs of CI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]
divijvaidya commented on PR #15109: URL: https://github.com/apache/kafka/pull/15109#issuecomment-1874483500 Hey @nicktelford , you don't need committer permissions to look at ASF gradle. Try using this link: https://ge.apache.org, it should be publicly accessible. For this PR, here's the build - https://ge.apache.org/s/bdwopqcoxwge2 which shows `Build Cache` as `On` which is unlike this build on trunk https://ge.apache.org/s/ywmt7vaghr62u where `Build Cache` is `Off`. I am not sure how useful this will be since CI for trunk may build on different boxes but it's definitely worth keeping it on. @ijuma is our gradle expert in the community. I will wait to hear his thoughts regarding this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16051) Deadlock on connector initialization
[ https://issues.apache.org/jira/browse/KAFKA-16051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801909#comment-17801909 ] Greg Harris commented on KAFKA-16051: - I added permissions for you on Jira, and assigned the ticket to you. It is expected that you don't have permission to add reviewers directly on github. In the future, you can at-mention reviewers, both other contributors and committers and ask for review, without using the "ask for review" feature on github which requires special permissions. > Deadlock on connector initialization > > > Key: KAFKA-16051 > URL: https://issues.apache.org/jira/browse/KAFKA-16051 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.6.3, 3.6.1 >Reporter: Octavian Ciubotaru >Assignee: Octavian Ciubotaru >Priority: Major > > > Tested with Kafka 3.6.1 and 2.6.3. > The only plugin installed is confluentinc-kafka-connect-jdbc-10.7.4. > Stack trace for Kafka 3.6.1: > {noformat} > Found one Java-level deadlock: > = > "pool-3-thread-1": > waiting to lock monitor 0x7fbc88006300 (object 0x91002aa0, a > org.apache.kafka.connect.runtime.standalone.StandaloneHerder), > which is held by "Thread-9" > "Thread-9": > waiting to lock monitor 0x7fbc88008800 (object 0x9101ccd8, a > org.apache.kafka.connect.storage.MemoryConfigBackingStore), > which is held by "pool-3-thread-1"Java stack information for the threads > listed above: > === > "pool-3-thread-1": > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder$ConfigUpdateListener.onTaskConfigUpdate(StandaloneHerder.java:516) > - waiting to lock <0x91002aa0> (a > org.apache.kafka.connect.runtime.standalone.StandaloneHerder) > at > org.apache.kafka.connect.storage.MemoryConfigBackingStore.putTaskConfigs(MemoryConfigBackingStore.java:137) > - locked <0x9101ccd8> (a > org.apache.kafka.connect.storage.MemoryConfigBackingStore) > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:483) > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder.lambda$null$2(StandaloneHerder.java:229) > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder$$Lambda$692/0x000840557440.run(Unknown > Source) > at > java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.21/Executors.java:515) > at > java.util.concurrent.FutureTask.run(java.base@11.0.21/FutureTask.java:264) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(java.base@11.0.21/ScheduledThreadPoolExecutor.java:304) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.21/ThreadPoolExecutor.java:1128) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.21/ThreadPoolExecutor.java:628) > at java.lang.Thread.run(java.base@11.0.21/Thread.java:829) > "Thread-9": > at > org.apache.kafka.connect.storage.MemoryConfigBackingStore.putTaskConfigs(MemoryConfigBackingStore.java:129) > - waiting to lock <0x9101ccd8> (a > org.apache.kafka.connect.storage.MemoryConfigBackingStore) > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:483) > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder.requestTaskReconfiguration(StandaloneHerder.java:255) > - locked <0x91002aa0> (a > org.apache.kafka.connect.runtime.standalone.StandaloneHerder) > at > org.apache.kafka.connect.runtime.HerderConnectorContext.requestTaskReconfiguration(HerderConnectorContext.java:50) > at > org.apache.kafka.connect.runtime.WorkerConnector$WorkerConnectorContext.requestTaskReconfiguration(WorkerConnector.java:548) > at > io.confluent.connect.jdbc.source.TableMonitorThread.run(TableMonitorThread.java:86) > Found 1 deadlock. > {noformat} > The jdbc source connector is loading tables from the database and updates the > configuration once the list is available. The deadlock is very consistent in > my environment, probably because the database is on the same machine. > Maybe it is possible to avoid this situation by always locking the herder > first and the config backing store second. From what I see, > updateConnectorTasks sometimes is called before locking on herder and other > times it is not. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16051) Deadlock on connector initialization
[ https://issues.apache.org/jira/browse/KAFKA-16051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-16051: --- Assignee: Octavian Ciubotaru > Deadlock on connector initialization > > > Key: KAFKA-16051 > URL: https://issues.apache.org/jira/browse/KAFKA-16051 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.6.3, 3.6.1 >Reporter: Octavian Ciubotaru >Assignee: Octavian Ciubotaru >Priority: Major > > > Tested with Kafka 3.6.1 and 2.6.3. > The only plugin installed is confluentinc-kafka-connect-jdbc-10.7.4. > Stack trace for Kafka 3.6.1: > {noformat} > Found one Java-level deadlock: > = > "pool-3-thread-1": > waiting to lock monitor 0x7fbc88006300 (object 0x91002aa0, a > org.apache.kafka.connect.runtime.standalone.StandaloneHerder), > which is held by "Thread-9" > "Thread-9": > waiting to lock monitor 0x7fbc88008800 (object 0x9101ccd8, a > org.apache.kafka.connect.storage.MemoryConfigBackingStore), > which is held by "pool-3-thread-1"Java stack information for the threads > listed above: > === > "pool-3-thread-1": > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder$ConfigUpdateListener.onTaskConfigUpdate(StandaloneHerder.java:516) > - waiting to lock <0x91002aa0> (a > org.apache.kafka.connect.runtime.standalone.StandaloneHerder) > at > org.apache.kafka.connect.storage.MemoryConfigBackingStore.putTaskConfigs(MemoryConfigBackingStore.java:137) > - locked <0x9101ccd8> (a > org.apache.kafka.connect.storage.MemoryConfigBackingStore) > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:483) > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder.lambda$null$2(StandaloneHerder.java:229) > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder$$Lambda$692/0x000840557440.run(Unknown > Source) > at > java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.21/Executors.java:515) > at > java.util.concurrent.FutureTask.run(java.base@11.0.21/FutureTask.java:264) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(java.base@11.0.21/ScheduledThreadPoolExecutor.java:304) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.21/ThreadPoolExecutor.java:1128) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.21/ThreadPoolExecutor.java:628) > at java.lang.Thread.run(java.base@11.0.21/Thread.java:829) > "Thread-9": > at > org.apache.kafka.connect.storage.MemoryConfigBackingStore.putTaskConfigs(MemoryConfigBackingStore.java:129) > - waiting to lock <0x9101ccd8> (a > org.apache.kafka.connect.storage.MemoryConfigBackingStore) > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:483) > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder.requestTaskReconfiguration(StandaloneHerder.java:255) > - locked <0x91002aa0> (a > org.apache.kafka.connect.runtime.standalone.StandaloneHerder) > at > org.apache.kafka.connect.runtime.HerderConnectorContext.requestTaskReconfiguration(HerderConnectorContext.java:50) > at > org.apache.kafka.connect.runtime.WorkerConnector$WorkerConnectorContext.requestTaskReconfiguration(WorkerConnector.java:548) > at > io.confluent.connect.jdbc.source.TableMonitorThread.run(TableMonitorThread.java:86) > Found 1 deadlock. > {noformat} > The jdbc source connector is loading tables from the database and updates the > configuration once the list is available. The deadlock is very consistent in > my environment, probably because the database is on the same machine. > Maybe it is possible to avoid this situation by always locking the herder > first and the config backing store second. From what I see, > updateConnectorTasks sometimes is called before locking on herder and other > times it is not. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
gharris1727 commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1439665912 ## clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java: ## @@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() { public static class TestFileConfigProvider extends FileConfigProvider { @Override -protected Reader reader(String path) throws IOException { +protected Reader reader(Path path) throws IOException { return new StringReader("testKey=testResult\ntestKey2=testResult2"); } } + +@Test +public void testAllowedDirPath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(dirFile); +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testAllowedFilePath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(dirFile); +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testMultipleAllowedPaths() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir); +configProvider.configure(configs); + +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); + +ConfigData configData = configProvider.get(dirFile); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); + +configData = configProvider.get(siblingDirFile); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testNotAllowedDirPath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(siblingDirFile); +assertTrue(configData.data().isEmpty()); +assertNull(configData.ttl()); +} + +@Test +public void testNotAllowedFilePath() throws IOException { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +//another file under the same directory +Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2")); +ConfigData configData = configProvider.get(dirFile2.toString()); +assertTrue(configData.data().isEmpty()); +assertNull(configData.ttl()); +} + +@Test +public void testNoTraversal() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +// Check we can't escape outside the path directory +ConfigData configData = configProvider.get(dirFile + Paths.get("/../siblingdir/siblingdirFile")); Review Comment: This is still an invalid path traversal attack. The duplicate test in DirectoryConfigProviderTest appears to be fine. Have you looked into deduplicating these two classes? That would also avoid requiring two different tests for the same traversal attack. ## clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java: ## @@ -153,5 +159,57 @@ public void testServiceLoaderDiscovery() { ServiceLoader serviceLoader = ServiceLoader.load(ConfigProvider.class); assertTrue(StreamSupport.stream(serviceLoader.spliterator(), false).anyMatch(configProvider -> configProvider instanceof DirectoryConfigProvider)); } + +@Test +public void testAllowedPath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, parent.getAbsolutePath()); +provider.configure(configs); + +ConfigData configData = provider.get(dir); +assertEquals(toSet(asList(foo, bar)), configData.data().keySet()); +assertEquals("FOO", configData.data().get(foo)); +assertEquals("BAR", configData.data().get(bar)); +assertNull(configData.ttl()); +} + +@Test +public void testMultipleAllowedPaths() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir); +provider.configure(configs); + +ConfigData configData = provider.get(subdir); +assertEquals(toSet(asList(subdirFileName)),
[jira] [Commented] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801904#comment-17801904 ] Almog Gavra commented on KAFKA-16046: - https://github.com/apache/kafka/pull/15073 > Stream Stream Joins fail after restoration with deserialization exceptions > -- > > Key: KAFKA-16046 > URL: https://issues.apache.org/jira/browse/KAFKA-16046 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Almog Gavra >Assignee: Almog Gavra >Priority: Blocker > Labels: streams > Fix For: 3.7.0 > > > Before KIP-954, the `KStreamImplJoin` class would always create > non-timestamped persistent windowed stores. After that KIP, the default was > changed to create timestamped stores. This wasn't compatible because, during > restoration, timestamped stores have their changelog values transformed to > prepend the timestamp to the value. This caused serialization errors when > trying to read from the store because the deserializers did not expect the > timestamp to be prepended. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Almog Gavra reopened KAFKA-16046: - > Stream Stream Joins fail after restoration with deserialization exceptions > -- > > Key: KAFKA-16046 > URL: https://issues.apache.org/jira/browse/KAFKA-16046 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Almog Gavra >Assignee: Almog Gavra >Priority: Blocker > Labels: streams > Fix For: 3.7.0 > > > Before KIP-954, the `KStreamImplJoin` class would always create > non-timestamped persistent windowed stores. After that KIP, the default was > changed to create timestamped stores. This wasn't compatible because, during > restoration, timestamped stores have their changelog values transformed to > prepend the timestamp to the value. This caused serialization errors when > trying to read from the store because the deserializers did not expect the > timestamp to be prepended. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15215: docs for KIP-954 [kafka]
ableegoldman commented on PR #14949: URL: https://github.com/apache/kafka/pull/14949#issuecomment-1874410202 Merged to trunk and cherrypicked to 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: docs for KIP-954 [kafka]
ableegoldman merged PR #14949: URL: https://github.com/apache/kafka/pull/14949 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: docs for KIP-954 [kafka]
ableegoldman commented on PR #14949: URL: https://github.com/apache/kafka/pull/14949#issuecomment-1874406419 FYI @agavra try to always fill out a PR description, even if it's super short/simple, so we have something for the commit message. I'll just make it up for this case but going forward it's on you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: docs for KIP-954 [kafka]
ableegoldman commented on PR #14949: URL: https://github.com/apache/kafka/pull/14949#issuecomment-1874405243 Ok this has dragged on for long enough, I'm just going to merge this even though we haven't gotten a single individual run without any build failures, because (a) we have at least one yellow build without compiler errors for each JDK if you look across the last three runs, and (b) this is only touching docs anyway so the failures are pretty clearly unrelated cc @stanislavkozlovski going to merge this to 3.7 since it's just feature docs for 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
mimaison commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1439616708 ## clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java: ## @@ -43,36 +49,36 @@ public class DirectoryConfigProviderTest { private DirectoryConfigProvider provider; private File parent; -private File dir; -private File bar; -private File foo; -private File subdir; -private File subdirFile; -private File siblingDir; -private File siblingDirFile; -private File siblingFile; - -private static File writeFile(File file) throws IOException { -Files.write(file.toPath(), file.getName().toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8)); -return file; +private String dir; +private final String bar = "bar"; +private final String foo = "foo"; +private String subdir; +private final String subdirFileName = "subdirFile"; +private String siblingDir; +private final String siblingDirFileName = "siblingDirFile"; +private final String siblingFileName = "siblingFile"; + +private static Path writeFile(Path path) throws IOException { +return Files.write(path, String.valueOf(path.getFileName()).toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8)); } @BeforeEach public void setup() throws IOException { provider = new DirectoryConfigProvider(); provider.configure(Collections.emptyMap()); parent = TestUtils.tempDirectory(); -dir = new File(parent, "dir"); -dir.mkdir(); -foo = writeFile(new File(dir, "foo")); -bar = writeFile(new File(dir, "bar")); -subdir = new File(dir, "subdir"); -subdir.mkdir(); -subdirFile = writeFile(new File(subdir, "subdirFile")); -siblingDir = new File(parent, "siblingdir"); -siblingDir.mkdir(); -siblingDirFile = writeFile(new File(siblingDir, "siblingdirFile")); -siblingFile = writeFile(new File(parent, "siblingFile")); + +dir = String.valueOf(Files.createDirectory(Paths.get(parent.getAbsolutePath(), "dir"))); Review Comment: I think we can just use `.toString()` here as `Files.createDirectory()` should never return `null`. ## clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java: ## @@ -44,8 +47,23 @@ public class DirectoryConfigProvider implements ConfigProvider { private static final Logger log = LoggerFactory.getLogger(DirectoryConfigProvider.class); +public static final String ALLOWED_PATHS_CONFIG = "allowed.paths"; +public static final String ALLOWED_PATHS_DOC = "Path that this config provider is allowed to access"; Review Comment: `Path that this config` -> `A comma separated list of paths that this config` ## clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java: ## @@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider { private static final Logger log = LoggerFactory.getLogger(FileConfigProvider.class); +public static final String ALLOWED_PATHS_CONFIG = "allowed.paths"; +public static final String ALLOWED_PATHS_DOC = "Path that this config provider is allowed to access"; +private List allowedPaths; + public void configure(Map configs) { +if (configs.containsKey(ALLOWED_PATHS_CONFIG)) { +String configValue = (String) configs.get(ALLOWED_PATHS_CONFIG); + +if (configValue != null && !configValue.isEmpty()) { Review Comment: Should we throw or log something if this field is set to a bad value? ## clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java: ## @@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider { private static final Logger log = LoggerFactory.getLogger(FileConfigProvider.class); +public static final String ALLOWED_PATHS_CONFIG = "allowed.paths"; +public static final String ALLOWED_PATHS_DOC = "Path that this config provider is allowed to access"; +private List allowedPaths; + public void configure(Map configs) { +if (configs.containsKey(ALLOWED_PATHS_CONFIG)) { +String configValue = (String) configs.get(ALLOWED_PATHS_CONFIG); + +if (configValue != null && !configValue.isEmpty()) { +allowedPaths = new ArrayList<>(); +Arrays.stream(configValue.split(",")).forEach(b -> allowedPaths.add(Paths.get(b).normalize())); +} +} else { +allowedPaths = null; Review Comment: Could we initialize the field when it's declared and remove this `else` block? ## clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java: ## @@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {
Re: [PR] KAFKA-15816: Fix leaked sockets in trogdor tests [kafka]
gharris1727 commented on PR #14771: URL: https://github.com/apache/kafka/pull/14771#issuecomment-1874318692 @divijvaidya Thanks for the review. I applied your suggestion. PTAL, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]
jolshan commented on PR #15093: URL: https://github.com/apache/kafka/pull/15093#issuecomment-1874304438 Thanks folks for fixing this stuff :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15816) Typos in tests leak network sockets
[ https://issues.apache.org/jira/browse/KAFKA-15816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15816: Description: There are a few tests which leak network sockets due to small typos in the tests themselves. Clients: [https://github.com/apache/kafka/pull/14750] (DONE) * NioEchoServer * KafkaConsumerTest * KafkaProducerTest * SelectorTest * SslTransportLayerTest * SslTransportTls12Tls13Test * SslVersionsTransportLayerTest * SaslAuthenticatorTest Core: [https://github.com/apache/kafka/pull/14754] * MiniKdc * GssapiAuthenticationTest * MirrorMakerIntegrationTest * SocketServerTest * EpochDrivenReplicationProtocolAcceptanceTest * LeaderEpochIntegrationTest Trogdor: [https://github.com/apache/kafka/pull/14771] * AgentTest Mirror: [https://github.com/apache/kafka/pull/14761] (DONE) * DedicatedMirrorIntegrationTest * MirrorConnectorsIntegrationTest * MirrorConnectorsWithCustomForwardingAdminIntegrationTest Runtime: [https://github.com/apache/kafka/pull/14764] (DONE) * ConnectorTopicsIntegrationTest * ExactlyOnceSourceIntegrationTest * WorkerTest * WorkerGroupMemberTest Streams: [https://github.com/apache/kafka/pull/14769] (DONE) * IQv2IntegrationTest * MetricsReporterIntegrationTest * NamedTopologyIntegrationTest * PurgeRepartitionTopicIntegrationTest These can be addressed by just fixing the tests. was: There are a few tests which leak network sockets due to small typos in the tests themselves. Clients: [https://github.com/apache/kafka/pull/14750] (DONE) * NioEchoServer * KafkaConsumerTest * KafkaProducerTest * SelectorTest * SslTransportLayerTest * SslTransportTls12Tls13Test * SslVersionsTransportLayerTest * SaslAuthenticatorTest Core: [https://github.com/apache/kafka/pull/14754] * MiniKdc * GssapiAuthenticationTest * MirrorMakerIntegrationTest * SocketServerTest * EpochDrivenReplicationProtocolAcceptanceTest * LeaderEpochIntegrationTest Trogdor: [https://github.com/apache/kafka/pull/14771] * AgentTest Mirror: [https://github.com/apache/kafka/pull/14761] (DONE) * DedicatedMirrorIntegrationTest * MirrorConnectorsIntegrationTest * MirrorConnectorsWithCustomForwardingAdminIntegrationTest Runtime: [https://github.com/apache/kafka/pull/14764] * ConnectorTopicsIntegrationTest * ExactlyOnceSourceIntegrationTest * WorkerTest * WorkerGroupMemberTest Streams: [https://github.com/apache/kafka/pull/14769] (DONE) * IQv2IntegrationTest * MetricsReporterIntegrationTest * NamedTopologyIntegrationTest * PurgeRepartitionTopicIntegrationTest These can be addressed by just fixing the tests. > Typos in tests leak network sockets > --- > > Key: KAFKA-15816 > URL: https://issues.apache.org/jira/browse/KAFKA-15816 > Project: Kafka > Issue Type: Bug > Components: unit tests >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > There are a few tests which leak network sockets due to small typos in the > tests themselves. > Clients: [https://github.com/apache/kafka/pull/14750] (DONE) > * NioEchoServer > * KafkaConsumerTest > * KafkaProducerTest > * SelectorTest > * SslTransportLayerTest > * SslTransportTls12Tls13Test > * SslVersionsTransportLayerTest > * SaslAuthenticatorTest > Core: [https://github.com/apache/kafka/pull/14754] > * MiniKdc > * GssapiAuthenticationTest > * MirrorMakerIntegrationTest > * SocketServerTest > * EpochDrivenReplicationProtocolAcceptanceTest > * LeaderEpochIntegrationTest > Trogdor: [https://github.com/apache/kafka/pull/14771] > * AgentTest > Mirror: [https://github.com/apache/kafka/pull/14761] (DONE) > * DedicatedMirrorIntegrationTest > * MirrorConnectorsIntegrationTest > * MirrorConnectorsWithCustomForwardingAdminIntegrationTest > Runtime: [https://github.com/apache/kafka/pull/14764] (DONE) > * ConnectorTopicsIntegrationTest > * ExactlyOnceSourceIntegrationTest > * WorkerTest > * WorkerGroupMemberTest > Streams: [https://github.com/apache/kafka/pull/14769] (DONE) > * IQv2IntegrationTest > * MetricsReporterIntegrationTest > * NamedTopologyIntegrationTest > * PurgeRepartitionTopicIntegrationTest > These can be addressed by just fixing the tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16046: also fix stores for outer join [kafka]
lucasbru commented on PR #15073: URL: https://github.com/apache/kafka/pull/15073#issuecomment-1874248887 @stanislavkozlovski @mjsax This is a blocker issue for 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Bugfix: Prevent java.lang.UnsupportedOperationException. [kafka]
vamossagar12 commented on PR #14955: URL: https://github.com/apache/kafka/pull/14955#issuecomment-1874218869 @divijvaidya , do you mind taking a look at this one? Once the checkstyles are fixed, It looks good to go from my side (will approve once the changes are made). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Bugfix: Prevent java.lang.UnsupportedOperationException. [kafka]
vamossagar12 commented on code in PR #14955: URL: https://github.com/apache/kafka/pull/14955#discussion_r1439583795 ## clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java: ## @@ -1014,7 +1014,9 @@ synchronized public DescribeLogDirsResult describeLogDirs(Collection br for (Node node : nodes) { Map logDirDescriptionMap = unwrappedResults.get(node.id()); LogDirDescription logDirDescription = logDirDescriptionMap.getOrDefault(partitionLogDirs.get(0), new LogDirDescription(null, new HashMap<>())); -logDirDescription.replicaInfos().put(new TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 0, false)); +Map topicPartitionReplicaInfoMap = new HashMap<>(logDirDescription.replicaInfos()); +topicPartitionReplicaInfoMap.put(new TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 0, false)); +logDirDescriptionMap.put(partitionLogDirs.get(0), new LogDirDescription(logDirDescription.error(), topicPartitionReplicaInfoMap)); Review Comment: Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9545: Fix IllegalStateException in updateLags [kafka]
lucasbru merged PR #15096: URL: https://github.com/apache/kafka/pull/15096 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: avoid unnecessary UnsupportedOperationException [kafka]
mjsax merged PR #15102: URL: https://github.com/apache/kafka/pull/15102 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16059) Fix leak of ExpirationReaper-1-AlterAcls threads in :core:test
[ https://issues.apache.org/jira/browse/KAFKA-16059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-16059: - Fix Version/s: 3.7.0 3.8.0 > Fix leak of ExpirationReaper-1-AlterAcls threads in :core:test > -- > > Key: KAFKA-16059 > URL: https://issues.apache.org/jira/browse/KAFKA-16059 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Divij Vaidya >Priority: Major > Fix For: 3.7.0, 3.8.0 > > Attachments: Screenshot 2023-12-29 at 11.13.01.png > > > We are leaking hundreds of ExpirationReaper-1-AlterAcls threads in one of the > tests in :core:test > {code:java} > "ExpirationReaper-1-AlterAcls" prio=0 tid=0x0 nid=0x0 waiting on condition > java.lang.Thread.State: TIMED_WAITING > on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3688fc67 > at java.base@17.0.9/jdk.internal.misc.Unsafe.park(Native Method) > at > java.base@17.0.9/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:252) > at > java.base@17.0.9/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1672) > at > java.base@17.0.9/java.util.concurrent.DelayQueue.poll(DelayQueue.java:265) > at > app//org.apache.kafka.server.util.timer.SystemTimer.advanceClock(SystemTimer.java:87) > at > app//kafka.server.DelayedOperationPurgatory.advanceClock(DelayedOperation.scala:418) > at > app//kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:444) > at > app//org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > {code} > The objective of this Jira is to identify the test and fix this leak -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer
[ https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801817#comment-17801817 ] Kamal Chandraprakash commented on KAFKA-15777: -- [~isding_l] This task require a KIP as we may have to add a new config to the consumer/broker. > Configurable remote fetch bytes per partition from Consumer > --- > > Key: KAFKA-15777 > URL: https://issues.apache.org/jira/browse/KAFKA-15777 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > A consumer can configure the amount of local bytes to read from each > partition in the FETCH request. > {{max.fetch.bytes}} = 50 MB > {{max.partition.fetch.bytes}} = 1 MB > Similar to this, the consumer should be able to configure > {{max.remote.partition.fetch.bytes}} = 4 MB. > While handling the {{FETCH}} request, if we encounter a partition to read > data from remote storage, then rest of the partitions in the request are > ignored. Essentially, we are serving only 1 MB of remote data per FETCH > request when all the partitions in the request are to be served from the > remote storage. > Providing one more configuration to the client help the user to tune the > values depending on their storage plugin. The user might want to optimise the > number of calls to remote storage vs amount of bytes returned back to the > client in the FETCH response. > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer
[ https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15777: - Labels: kip (was: ) > Configurable remote fetch bytes per partition from Consumer > --- > > Key: KAFKA-15777 > URL: https://issues.apache.org/jira/browse/KAFKA-15777 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Labels: kip > > A consumer can configure the amount of local bytes to read from each > partition in the FETCH request. > {{max.fetch.bytes}} = 50 MB > {{max.partition.fetch.bytes}} = 1 MB > Similar to this, the consumer should be able to configure > {{max.remote.partition.fetch.bytes}} = 4 MB. > While handling the {{FETCH}} request, if we encounter a partition to read > data from remote storage, then rest of the partitions in the request are > ignored. Essentially, we are serving only 1 MB of remote data per FETCH > request when all the partitions in the request are to be served from the > remote storage. > Providing one more configuration to the client help the user to tune the > values depending on their storage plugin. The user might want to optimise the > number of calls to remote storage vs amount of bytes returned back to the > client in the FETCH response. > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]
nicktelford commented on PR #15109: URL: https://github.com/apache/kafka/pull/15109#issuecomment-1874134447 I don't have access to the ASF Gradle Enterprise, so a committer will need to verify that this works as intended. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Enable Gradle Remote Build Cache [kafka]
nicktelford opened a new pull request, #15109: URL: https://github.com/apache/kafka/pull/15109 We enable the remote build cache, hosted by the ASF Gradle Enterprise instance. We only cache tasks during builds on `trunk`, to ensure that pushes to PRs always re-run all Tasks needed by that PR. PRs will read from the build cache, enabling tasks for modules that haven't been changed in the PR to be read from the cache. This should ensure that changes "leaf" modules (e.g. connect, streams, etc.) don't run the entire test suite from dependent modules (i.e. core, clients, storage, etc.), which can be very costly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]
divijvaidya merged PR #15093: URL: https://github.com/apache/kafka/pull/15093 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]
divijvaidya commented on PR #15093: URL: https://github.com/apache/kafka/pull/15093#issuecomment-1874131838 The test changed in this PR is successful - https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15093/3/testReport/kafka.server/ControllerApisTest/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id
[ https://issues.apache.org/jira/browse/KAFKA-16034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801811#comment-17801811 ] Phuc Hong Tran commented on KAFKA-16034: [~pnee], I don't think reset HeartbeatState is enough as it would set rebalanceTimeoutMs to -1, which is make the request invalid as the member epoch was set to 0 during fenced/unknown_memeber_id exception. We need to reassign the rebalanceTimeoutMs to sentFields. > AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on > fenced/unknown member Id > --- > > Key: KAFKA-16034 > URL: https://issues.apache.org/jira/browse/KAFKA-16034 > Project: Kafka > Issue Type: Sub-task >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Fix For: 3.8.0 > > > The consumer will log invalid request error when joining from fenced/unknown > member id because we didn't reset the HeartbeatState and we won't send the > needed fields (rebalanceTimeoutMs for example) when joining. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Bugfix: Prevent java.lang.UnsupportedOperationException. [kafka]
jamespfaulkner commented on code in PR #14955: URL: https://github.com/apache/kafka/pull/14955#discussion_r1439519451 ## clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java: ## @@ -1014,7 +1014,9 @@ synchronized public DescribeLogDirsResult describeLogDirs(Collection br for (Node node : nodes) { Map logDirDescriptionMap = unwrappedResults.get(node.id()); LogDirDescription logDirDescription = logDirDescriptionMap.getOrDefault(partitionLogDirs.get(0), new LogDirDescription(null, new HashMap<>())); -logDirDescription.replicaInfos().put(new TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 0, false)); +Map topicPartitionReplicaInfoMap = new HashMap<>(logDirDescription.replicaInfos()); +topicPartitionReplicaInfoMap.put(new TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 0, false)); +logDirDescriptionMap.put(partitionLogDirs.get(0), new LogDirDescription(logDirDescription.error(), topicPartitionReplicaInfoMap)); Review Comment: Good point, thanks for the feedback. I've made this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1874100254 @clolov Can we merge the request ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer
[ https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801803#comment-17801803 ] Lan Ding commented on KAFKA-15777: -- hi [~ckamal], could I pick this up? > Configurable remote fetch bytes per partition from Consumer > --- > > Key: KAFKA-15777 > URL: https://issues.apache.org/jira/browse/KAFKA-15777 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > A consumer can configure the amount of local bytes to read from each > partition in the FETCH request. > {{max.fetch.bytes}} = 50 MB > {{max.partition.fetch.bytes}} = 1 MB > Similar to this, the consumer should be able to configure > {{max.remote.partition.fetch.bytes}} = 4 MB. > While handling the {{FETCH}} request, if we encounter a partition to read > data from remote storage, then rest of the partitions in the request are > ignored. Essentially, we are serving only 1 MB of remote data per FETCH > request when all the partitions in the request are to be served from the > remote storage. > Providing one more configuration to the client help the user to tune the > values depending on their storage plugin. The user might want to optimise the > number of calls to remote storage vs amount of bytes returned back to the > client in the FETCH response. > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Fix flaky test RemoteIndexCacheTest.testClose() [kafka]
DL1231 opened a new pull request, #15108: URL: https://github.com/apache/kafka/pull/15108 Test fails 2% of the time. https://ge.apache.org/scans/tests?search.timeZoneId=Europe/Berlin=kafka.log.remote.RemoteIndexCacheTest=testClose() This test should be modified to test assertTrue(cache.cleanerThread.isShutdownComplete) in a TestUtils.waitUntilTrue condition which will catch the InterruptedException and exit successfully on it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
clolov commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1439460528 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: Apologies for the delay, okay, this reasoning makes sense to me, but even if I am wrong the while loop won't really cause a performance impact, so I am happy with it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate activeStateManager and standbyStateManager mocks in StoreChangelogReaderTest to Mockito [kafka]
clolov commented on code in PR #15106: URL: https://github.com/apache/kafka/pull/15106#discussion_r1439447074 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -642,11 +645,12 @@ public void shouldRestoreFromBeginningAndCheckCompletion() { @Test public void shouldCheckCompletionIfPositionLargerThanEndOffset() { +setupActiveStateManager(); Review Comment: Yeah, the reason for not doing it in @before is that since not all tests exercise the methods in the stubbing Mockito complains that there are unused stubs. This way we only stub in the tests where the code is exercised! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15206) Flaky test RemoteIndexCacheTest.testClose()
[ https://issues.apache.org/jira/browse/KAFKA-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lan Ding reassigned KAFKA-15206: Assignee: Lan Ding > Flaky test RemoteIndexCacheTest.testClose() > --- > > Key: KAFKA-15206 > URL: https://issues.apache.org/jira/browse/KAFKA-15206 > Project: Kafka > Issue Type: Test >Reporter: Divij Vaidya >Assignee: Lan Ding >Priority: Minor > Labels: flaky-test > Fix For: 3.8.0 > > > Test fails 2% of the time. > [https://ge.apache.org/scans/tests?search.timeZoneId=Europe/Berlin=kafka.log.remote.RemoteIndexCacheTest=testClose()] > > This test should be modified to test > assertTrue(cache.cleanerThread.isShutdownComplete) in a > TestUtils.waitUntilTrue condition which will catch the InterruptedException > and exit successfully on it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Improve code style about producer [kafka]
DL1231 opened a new pull request, #15107: URL: https://github.com/apache/kafka/pull/15107 I was reading about producer and found a couple of code style inconsistencies. This PR fixes them. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate activeStateManager and standbyStateManager mocks in StoreChangelogReaderTest to Mockito [kafka]
lucasbru commented on code in PR #15106: URL: https://github.com/apache/kafka/pull/15106#discussion_r1439431364 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -642,11 +645,12 @@ public void shouldRestoreFromBeginningAndCheckCompletion() { @Test public void shouldCheckCompletionIfPositionLargerThanEndOffset() { +setupActiveStateManager(); Review Comment: I suppose the point of not doing this in @Before is to avoid unnecessary-mocking errors? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14133: Migrate activeStateManager and standbyStateManager mocks in StoreChangelogReaderTest to Mockito [kafka]
clolov opened a new pull request, #15106: URL: https://github.com/apache/kafka/pull/15106 This pull request takes a similar approach to how TaskManagerTest is being migrated to Mockito mock by mock for easier reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate stateManager mock in StoreChangelogReaderTest to Mockito [kafka]
divijvaidya merged PR #14929: URL: https://github.com/apache/kafka/pull/14929 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate stateManager mock in StoreChangelogReaderTest to Mockito [kafka]
divijvaidya commented on PR #14929: URL: https://github.com/apache/kafka/pull/14929#issuecomment-1873974414 There are flaky tests in CI but the test changed in this PR is successful: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14929/2/testReport/org.apache.kafka.streams.processor.internals/StoreChangelogReaderTest/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
showuon commented on PR #15101: URL: https://github.com/apache/kafka/pull/15101#issuecomment-1873956404 @wernerdv , thanks for the smart way to detect the thread leaking! For now, only `core` module will introduce thousands of test errors. We need to fix all the thread leaking before extending to other modules. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
wernerdv commented on PR #15101: URL: https://github.com/apache/kafka/pull/15101#issuecomment-1873949674 @divijvaidya Thanks for the reply. Now extension runs only for modules that: `testImplementation project(':core')` Is this expected behavior or does it require improvement? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Decouple RocksDB access from CF [kafka]
nicktelford commented on PR #15105: URL: https://github.com/apache/kafka/pull/15105#issuecomment-1873934211 @cadonna @mjsax @ableegoldman @lucasbru @wcarlson5 @bbejeck @vvcephei @guozhangwang This is part of KIP-892, and has been broken out into a separate PR to reduce the review burden on the main KIP-892 implementation, since it can be merged independently. There are no tests, because there are no behavioural changes, just a refactoring. The existing test suite should ensure no regressions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14412: Decouple RocksDB access from CF [kafka]
nicktelford opened a new pull request, #15105: URL: https://github.com/apache/kafka/pull/15105 To support future use-cases that use different strategies for accessing RocksDB, we need to de-couple the RocksDB access strategy from the Column Family access strategy. To do this, we now have two separate accessors: * `DBAccessor`: dictates how we access RocksDB. Currently only one strategy is supported: `DirectDBAccessor`, which access RocksDB directly, via the `RocksDB` class for all operations. In the future, a `BatchedDBAccessor` will be added, which enables transactions via `WriteBatch`. * `ColumnFamilyAccessor`: maps StateStore operations to operations on one or more column families. This is a rename of the old `RocksDBDBAccessor`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
tinaselenge commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1439373207 ## clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java: ## @@ -83,27 +89,27 @@ public void close() throws IOException { @Test public void testGetAllKeysAtPath() { -ConfigData configData = provider.get(dir.getAbsolutePath()); Review Comment: This was resulted by using `java.nio.file.Files` instead of `File` therefore no longer can call getAbsolutePath() to pass in a string argument. The test variables types are changed to String, to avoid having to do string conversation everywhere. I didn't think changing test variable types would break the compatibility, since the we are still passing the same type of variable to the calling method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
tinaselenge commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1439364385 ## clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java: ## @@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() { public static class TestFileConfigProvider extends FileConfigProvider { @Override -protected Reader reader(String path) throws IOException { +protected Reader reader(Path path) throws IOException { return new StringReader("testKey=testResult\ntestKey2=testResult2"); } } + +@Test +public void testAllowedDirPath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(dirFile); +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testAllowedFilePath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(dirFile); +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testMultipleAllowedPaths() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir); +configProvider.configure(configs); + +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); + +ConfigData configData = configProvider.get(dirFile); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); + +configData = configProvider.get(siblingDirFile); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testNotAllowedDirPath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(siblingDirFile); +assertTrue(configData.data().isEmpty()); +assertNull(configData.ttl()); +} + +@Test +public void testNotAllowedFilePath() throws IOException { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +//another file under the same directory +Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2")); +ConfigData configData = configProvider.get(dirFile2.toString()); +assertTrue(configData.data().isEmpty()); +assertNull(configData.ttl()); +} + +@Test +public void testNoTraversal() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +// Check we can't escape outside the path directory +ConfigData configData = configProvider.get(dir + "../siblingdir/siblingdirFile"); Review Comment: good catch! I have fixed these now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16074) Fix thread leaks in ReplicaManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-16074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801744#comment-17801744 ] Divij Vaidya commented on KAFKA-16074: -- https://github.com/apache/kafka/pull/15077 > Fix thread leaks in ReplicaManagerTest > -- > > Key: KAFKA-16074 > URL: https://issues.apache.org/jira/browse/KAFKA-16074 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Following [@dajac|https://github.com/dajac] 's finding in > [#15063|https://github.com/apache/kafka/pull/15063], I found we also create > new RemoteLogManager in ReplicaManagerTest, but didn't close them. > While investigating ReplicaManagerTest, I also found there are other threads > leaking: > # remote fetch reaper thread. It's because we create a reaper thread in > test, which is not expected. We should create a mocked one like other > purgatory instance. > # Throttle threads. We created a {{quotaManager}} to feed into the > replicaManager, but didn't close it. Actually, we have created a global > {{quotaManager}} instance and will close it on {{{}AfterEach{}}}. We should > re-use it. > # replicaManager and logManager didn't invoke {{close}} after test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16074) Fix thread leaks in ReplicaManagerTest
Divij Vaidya created KAFKA-16074: Summary: Fix thread leaks in ReplicaManagerTest Key: KAFKA-16074 URL: https://issues.apache.org/jira/browse/KAFKA-16074 Project: Kafka Issue Type: Sub-task Reporter: Luke Chen Assignee: Luke Chen Following [@dajac|https://github.com/dajac] 's finding in [#15063|https://github.com/apache/kafka/pull/15063], I found we also create new RemoteLogManager in ReplicaManagerTest, but didn't close them. While investigating ReplicaManagerTest, I also found there are other threads leaking: # remote fetch reaper thread. It's because we create a reaper thread in test, which is not expected. We should create a mocked one like other purgatory instance. # Throttle threads. We created a {{quotaManager}} to feed into the replicaManager, but didn't close it. Actually, we have created a global {{quotaManager}} instance and will close it on {{{}AfterEach{}}}. We should re-use it. # replicaManager and logManager didn't invoke {{close}} after test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16072) Create Junit 5 extension to detect thread leak
[ https://issues.apache.org/jira/browse/KAFKA-16072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-16072: - Fix Version/s: 3.8.0 > Create Junit 5 extension to detect thread leak > -- > > Key: KAFKA-16072 > URL: https://issues.apache.org/jira/browse/KAFKA-16072 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: Divij Vaidya >Assignee: Dmitry Werner >Priority: Major > Labels: newbie++ > Fix For: 3.8.0 > > > The objective of this task is to create a Junit extension that will execute > after every test and verify that there are no lingering threads left over. > An example of how to create an extension can be found here: > [https://github.com/apache/kafka/pull/14783/files#diff-812cfc2780b6fc0e7a1648ff37912ff13aeda4189ea6b0d4d847b831f66e56d1] > An example on how to find unexpected threads is at > [https://github.com/apache/kafka/blob/d5aa341a185f4df23bf587e55bcda4f16fc511f1/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2427] > and also at > https://issues.apache.org/jira/browse/KAFKA-16052?focusedCommentId=17800978=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17800978 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
divijvaidya commented on PR #15101: URL: https://github.com/apache/kafka/pull/15101#issuecomment-1873898677 Thank you for the change @wernerdv . As a next step, we need to fix the leaks that were detected by this PR before merging this. I already know of at least two PRs that will help: https://github.com/apache/kafka/pull/15093 and https://github.com/apache/kafka/pull/15077 . Let's wait for them to merge first and then we can rebase this on trunk and run again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]
divijvaidya commented on PR #15093: URL: https://github.com/apache/kafka/pull/15093#issuecomment-1873868215 Thank you for the review @showuon. I believe I have fixed all indentation problems now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Upgrade Zstd-jni to 1.5.5-11 [kafka]
divijvaidya merged PR #14798: URL: https://github.com/apache/kafka/pull/14798 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9545: Fix IllegalStateException in updateLags [kafka]
lucasbru commented on PR #15096: URL: https://github.com/apache/kafka/pull/15096#issuecomment-1873828040 > I do not completely understand how the consumer subscription can reflect the topic deletion while Streams has still references to the deleted topic. The update to the subscription of the consumer happens right after the call to `onAssignment()` [1]. Both calls should happen inside the call to `poll()`. What do I miss? In every iteration of the poll loop of the `ConsumerCoordinator` we update the subscription in `maybeUpdateSubscriptionMetadata`, based on the latest metadata known to the consumer - which will reflect the topic deletion and update the subscription. The subscription is used to update the assignment in `onJoinPrepare` and `onLeavePrepare`. https://github.com/apache/kafka/blob/2b99d0e45027c88ae2347fa8f7d1ff4b2b919089/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L801 https://github.com/apache/kafka/blob/2b99d0e45027c88ae2347fa8f7d1ff4b2b919089/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L857 The partitions in the `RecordCollector` will only be updated when we have a new assignment, so when in state `PREPARE_SHUTDOWN` or `PARTITIONS_REVOKED` the `RecordCollector` may have `TopicPartition`s that we do not technically own anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]
showuon commented on code in PR #15093: URL: https://github.com/apache/kafka/pull/15093#discussion_r1439274094 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2055,8 +2058,8 @@ class KafkaApisTest { responseCallback.capture(), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED))) - - createKafkaApis().handleInitProducerIdRequest(request, requestLocal) +kafkaApis = createKafkaApis() +kafkaApis.handleInitProducerIdRequest(request, requestLocal) Review Comment: Is the indent correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]
showuon commented on code in PR #15093: URL: https://github.com/apache/kafka/pull/15093#discussion_r1439274094 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2055,8 +2058,8 @@ class KafkaApisTest { responseCallback.capture(), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED))) - - createKafkaApis().handleInitProducerIdRequest(request, requestLocal) +kafkaApis = createKafkaApis() +kafkaApis.handleInitProducerIdRequest(request, requestLocal) Review Comment: Does the indent correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]
showuon commented on code in PR #15093: URL: https://github.com/apache/kafka/pull/15093#discussion_r1439274938 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2410,8 +2413,8 @@ class KafkaApisTest { responseCallback.capture(), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) - - createKafkaApis().handleEndTxnRequest(request, requestLocal) +kafkaApis = createKafkaApis() +kafkaApis.handleEndTxnRequest(request, requestLocal) Review Comment: here, and some places below. ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2055,8 +2058,8 @@ class KafkaApisTest { responseCallback.capture(), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED))) - - createKafkaApis().handleInitProducerIdRequest(request, requestLocal) +kafkaApis = createKafkaApis() +kafkaApis.handleInitProducerIdRequest(request, requestLocal) Review Comment: Does the intent correct? ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2167,8 +2170,8 @@ class KafkaApisTest { responseCallback.capture(), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) - - createKafkaApis().handleAddPartitionsToTxnRequest(request, requestLocal) +kafkaApis = createKafkaApis() +kafkaApis.handleAddPartitionsToTxnRequest(request, requestLocal) Review Comment: ditto ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2113,8 +2116,8 @@ class KafkaApisTest { responseCallback.capture(), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) - - createKafkaApis().handleAddOffsetsToTxnRequest(request, requestLocal) +kafkaApis = createKafkaApis() +kafkaApis.handleAddOffsetsToTxnRequest(request, requestLocal) Review Comment: ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org