[jira] [Created] (KAFKA-16885) Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to RemoteLogManagerConfig#isRemoteStorageSystemEnabled
Chia-Ping Tsai created KAFKA-16885: -- Summary: Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to RemoteLogManagerConfig#isRemoteStorageSystemEnabled Key: KAFKA-16885 URL: https://issues.apache.org/jira/browse/KAFKA-16885 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai see the discussion: https://github.com/apache/kafka/pull/16153#issuecomment-2144269279 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16859: Cleanup check if tiered storage is enabled [kafka]
chia7712 commented on PR #16153: URL: https://github.com/apache/kafka/pull/16153#issuecomment-2146674263 I file the ticket https://issues.apache.org/jira/browse/KAFKA-16885. We can have more discussion on there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Update spotless version and remove support JDK8 [kafka]
chia7712 commented on code in PR #16176: URL: https://github.com/apache/kafka/pull/16176#discussion_r1625383681 ## build.gradle: ## @@ -47,9 +47,11 @@ plugins { // Updating the shadow plugin version to 8.1.1 causes issue with signing and publishing the shadowed // artifacts - see https://github.com/johnrengelman/shadow/issues/901 id 'com.github.johnrengelman.shadow' version '8.1.0' apply false - // the minimum required JRE of 6.14.0+ is 11 + // the spotless before 6.14.0 support JDK8, after 6.23.3 support JDK21 Review Comment: ``` Spotless 6.13.0 has issue with Java 21 (see https://github.com/diffplug/spotless/pull/1920), and Spotless 6.14.0+ requires JRE 11 We are going to drop JDK8 support. Hence, the spotless is upgrade to newest version and be applied only if the build env is compatible with JDK 11. ``` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] (KAFKA-4094) Fix importance labels for Kafka Server config
[ https://issues.apache.org/jira/browse/KAFKA-4094 ] Abhi deleted comment on KAFKA-4094: - was (Author: JIRAUSER305362): Hi [~jkreps] Can you describe this issue more and also mention which component it belongs to? As per my understanding, you need the server configs like:- HIGH - broker.id, log.dirs, num.partitions MEDIUM - log.segment.bytes, log.cleanup.policy LOW - message.max.bytes, replica.fetch.max.bytes > Fix importance labels for Kafka Server config > - > > Key: KAFKA-4094 > URL: https://issues.apache.org/jira/browse/KAFKA-4094 > Project: Kafka > Issue Type: Bug >Reporter: Jay Kreps >Priority: Major > Labels: newbie > > We have > 100 server configs. The importance label is meant to help people > navigate this in a sane way. The intention is something like the following: > HIGH - things you must think about and set > MEDIUM - things you don't necessarily need to set but that you might want to > tune > LOW - thing you probably don't need to set > Currently we have a gazillion things marked as high including very subtle > tuning params and also things marked as deprecated (which probably should be > its own importance level). This makes it really hard for people to figure out > which configurations to actually learn about and use. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on PR #16068: URL: https://github.com/apache/kafka/pull/16068#issuecomment-2146608185 **Trunk** ``` Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units TargetAssignmentBuilderBenchmark.build 1 10 100 avgt5 4.292 ± 0.132 ms/op TargetAssignmentBuilderBenchmark.build 1 10 1000 avgt5 5.477 ± 1.292 ms/op ``` **Patch** ``` Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units TargetAssignmentBuilderBenchmark.build 1 10 100 avgt5 4.701 ± 0.257 ms/op TargetAssignmentBuilderBenchmark.build 1 10 1000 avgt5 5.492 ± 0.269 ms/op ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on PR #16068: URL: https://github.com/apache/kafka/pull/16068#issuecomment-2146607344 Trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ReassignPartitionsIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15675: URL: https://github.com/apache/kafka/pull/15675#issuecomment-2146578561 @chia7712 I address all comments. Thanks for the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1625304008 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: One drawback of the current approach is that the thread-pool will be shown as all "busy" in the metrics when the threads are waiting for the quota to be available. It can raise false alarms if user configures an 100% thread-pool usage alert. We can mention about the behavior in the docs section. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1625304008 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: One drawback of the current approach is that the thread-pool will be shown as all "busy" in the metrics. It can raise false alarms if user configures an alert on top of it. We can mention about the behavior in the docs section. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix typo in MetadataVersion.IBP_4_0_IV0 [kafka]
jolshan commented on PR #16181: URL: https://github.com/apache/kafka/pull/16181#issuecomment-2146534437 Cherrypicked to 3.8 as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1625304008 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: @abhijeetk88 One drawback of the current approach is that the thread-pool will be shown as all "busy" in the metrics. Could you confirm whether the threads are "shown as" busy when they are in `await`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix typo in MetadataVersion.IBP_4_0_IV0 [kafka]
jolshan merged PR #16181: URL: https://github.com/apache/kafka/pull/16181 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15931: Reopen TransactionIndex if channel is closed [kafka]
kamalcph commented on PR #15241: URL: https://github.com/apache/kafka/pull/15241#issuecomment-2146529487 @jeqo Any updates on this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16882: Migrate RemoteLogSegmentLifecycleTest to ClusterInstance infra [kafka]
brandboat commented on code in PR #16180: URL: https://github.com/apache/kafka/pull/16180#discussion_r1625299519 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java: ## @@ -197,333 +206,159 @@ public void testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot for (Map.Entry entry : expectedEpochToHighestOffset.entrySet()) { Integer epoch = entry.getKey(); Long expectedOffset = entry.getValue(); -Optional offset = remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch); -log.debug("Fetching highest offset for epoch: {} , returned: {} , expected: {}", epoch, offset, expectedOffset); -Assertions.assertEquals(Optional.of(expectedOffset), offset); +Optional offset = metadataManager.highestOffsetForEpoch(topicIdPartition, epoch); +assertEquals(Optional.of(expectedOffset), offset); } // Search for non existing leader epoch -Optional highestOffsetForEpoch5 = remoteLogSegmentLifecycleManager.highestOffsetForEpoch(5); -Assertions.assertFalse(highestOffsetForEpoch5.isPresent()); -} finally { -Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager"); +Optional highestOffsetForEpoch5 = metadataManager.highestOffsetForEpoch(topicIdPartition, 5); +assertFalse(highestOffsetForEpoch5.isPresent()); } } -private RemoteLogSegmentMetadata createSegmentUpdateWithState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager, - Map segmentLeaderEpochs, - long startOffset, - long endOffset, - RemoteLogSegmentState state) -throws RemoteStorageException { +private RemoteLogSegmentMetadata upsertSegmentState(RemoteLogMetadataManager metadataManager, +Map segmentLeaderEpochs, +long startOffset, +long endOffset, +RemoteLogSegmentState state) +throws RemoteStorageException, ExecutionException, InterruptedException { RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); -RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, BROKER_ID_0, - time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); - remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata); - -RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), -Optional.empty(), -state, BROKER_ID_1); - remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate); - +RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, +-1L, brokerId0, time.milliseconds(), segSize, segmentLeaderEpochs); +metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get(); + verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata); + +RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, +time.milliseconds(), Optional.empty(), state, brokerId1); + metadataManager.updateRemoteLogSegmentMetadata(segMetadataUpdate).get(); + verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segMetadataUpdate); return segmentMetadata.createWithUpdates(segMetadataUpdate); } -private static class EpochOffset { -final int epoch; -final long offset; - -private EpochOffset(int epoch, -long offset) { -this.epoch = epoch; -this.offset = offset; -} - -@Override -public boolean equals(Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -EpochOffset that = (EpochOffset) o; -return epoch == that.epoch && offset == that.offset; -} - -@Override -public int hashCode() { -return Objects.hash(epoch, offset); -} - -@Override -
Re: [PR] KAFKA-16882: Migrate RemoteLogSegmentLifecycleTest to ClusterInstance infra [kafka]
kamalcph commented on code in PR #16180: URL: https://github.com/apache/kafka/pull/16180#discussion_r1625298782 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java: ## @@ -197,333 +206,159 @@ public void testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot for (Map.Entry entry : expectedEpochToHighestOffset.entrySet()) { Integer epoch = entry.getKey(); Long expectedOffset = entry.getValue(); -Optional offset = remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch); -log.debug("Fetching highest offset for epoch: {} , returned: {} , expected: {}", epoch, offset, expectedOffset); -Assertions.assertEquals(Optional.of(expectedOffset), offset); +Optional offset = metadataManager.highestOffsetForEpoch(topicIdPartition, epoch); +assertEquals(Optional.of(expectedOffset), offset); } // Search for non existing leader epoch -Optional highestOffsetForEpoch5 = remoteLogSegmentLifecycleManager.highestOffsetForEpoch(5); -Assertions.assertFalse(highestOffsetForEpoch5.isPresent()); -} finally { -Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager"); +Optional highestOffsetForEpoch5 = metadataManager.highestOffsetForEpoch(topicIdPartition, 5); +assertFalse(highestOffsetForEpoch5.isPresent()); } } -private RemoteLogSegmentMetadata createSegmentUpdateWithState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager, - Map segmentLeaderEpochs, - long startOffset, - long endOffset, - RemoteLogSegmentState state) -throws RemoteStorageException { +private RemoteLogSegmentMetadata upsertSegmentState(RemoteLogMetadataManager metadataManager, +Map segmentLeaderEpochs, +long startOffset, +long endOffset, +RemoteLogSegmentState state) +throws RemoteStorageException, ExecutionException, InterruptedException { RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); -RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, BROKER_ID_0, - time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); - remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata); - -RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), -Optional.empty(), -state, BROKER_ID_1); - remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate); - +RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, +-1L, brokerId0, time.milliseconds(), segSize, segmentLeaderEpochs); +metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get(); + verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata); + +RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, +time.milliseconds(), Optional.empty(), state, brokerId1); + metadataManager.updateRemoteLogSegmentMetadata(segMetadataUpdate).get(); + verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segMetadataUpdate); return segmentMetadata.createWithUpdates(segMetadataUpdate); } -private static class EpochOffset { -final int epoch; -final long offset; - -private EpochOffset(int epoch, -long offset) { -this.epoch = epoch; -this.offset = offset; -} - -@Override -public boolean equals(Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -EpochOffset that = (EpochOffset) o; -return epoch == that.epoch && offset == that.offset; -} - -@Override -public int hashCode() { -return Objects.hash(epoch, offset); -} - -@Override -
Re: [PR] KAFKA-16715: Create KafkaShareConsumer interfaces [kafka]
omkreddy merged PR #16134: URL: https://github.com/apache/kafka/pull/16134 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Log reason for deletion of KRaft snapshot [kafka]
github-actions[bot] commented on PR #15450: URL: https://github.com/apache/kafka/pull/15450#issuecomment-2146521360 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1625290704 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { +logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); +// If the thread gets interrupted while waiting, the InterruptedException is thrown +// back to the caller. It's important to note that the task being executed is already +// cancelled before the executing thread is interrupted. The caller is responsible +// for handling the exception gracefully by checking if the task is already cancelled. Review Comment: Could you cover shutdown when the quota gets breached as unit test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
showuon commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1625271623 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that is eligible for copy +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStateManager()).thenReturn(mockStateManager); +when(mockLog.lastStableOffset()).thenReturn(250L); + +File tempDir = TestUtils.tempDirectory(); +OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); +TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); +File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); +txnFile.createNewFile(); +TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); +when(oldSegment.timeIndex()).thenReturn(timeIdx); +when(oldSegment.offsetIndex()).thenReturn(idx); +when(oldSegment.txnIndex()).thenReturn(txnIndex); + +CompletableFuture dummyFuture = new CompletableFuture<>(); +dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + +when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); +doNothing().when(rlmCopyQuotaManager).record(anyInt()); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.convertToLeader(2); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +try { +assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); +fail(EXPECTED_THE_OPERATION_TO_TIME_OUT); +} catch (AssertionFailedError e) { +// Fail the test if the operation completed within the timeout +if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) { +fail(e.getMessage()); +} +} + +// Verify the highest offset in remote storage is updated only once +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog,
Re: [PR] KAFKA-10787: Update spotless version and remove support JDK8 [kafka]
gongxuanzhang commented on PR #16176: URL: https://github.com/apache/kafka/pull/16176#issuecomment-2146474883 @chia7712 plz take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15355: Message schema changes [kafka]
tisonkun commented on code in PR #14290: URL: https://github.com/apache/kafka/pull/14290#discussion_r1625260173 ## clients/src/main/resources/common/message/BrokerHeartbeatRequest.json: ## @@ -30,6 +30,8 @@ { "name": "WantFence", "type": "bool", "versions": "0+", "about": "True if the broker wants to be fenced, false otherwise." }, { "name": "WantShutDown", "type": "bool", "versions": "0+", - "about": "True if the broker wants to be shut down, false otherwise." } + "about": "True if the broker wants to be shut down, false otherwise." }, +{ "name": "OfflineLogDirs", "type": "[]uuid", "versions": "1+", "taggedVersions": "1+", "tag": "0", Review Comment: Shall this `"tag": "0"` be `"tag": 0`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15305: The background thread should try to process the remaining task until the shutdown timer is expired. [kafka]
frankvicky commented on code in PR #16156: URL: https://github.com/apache/kafka/pull/16156#discussion_r1625253815 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -293,12 +293,10 @@ private void closeInternal(final Duration timeout) { * Check the unsent queue one last time and poll until all requests are sent or the timer runs out. */ private void sendUnsentRequests(final Timer timer) { -if (networkClientDelegate.unsentRequests().isEmpty()) -return; -do { +while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests()) { Review Comment: Hi @lianetm, thank you for pointing out this potential bug, I will fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16882: Migrate RemoteLogSegmentLifecycleTest to ClusterInstance infra [kafka]
brandboat commented on code in PR #16180: URL: https://github.com/apache/kafka/pull/16180#discussion_r1625252312 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java: ## @@ -197,333 +206,159 @@ public void testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot for (Map.Entry entry : expectedEpochToHighestOffset.entrySet()) { Integer epoch = entry.getKey(); Long expectedOffset = entry.getValue(); -Optional offset = remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch); -log.debug("Fetching highest offset for epoch: {} , returned: {} , expected: {}", epoch, offset, expectedOffset); -Assertions.assertEquals(Optional.of(expectedOffset), offset); +Optional offset = metadataManager.highestOffsetForEpoch(topicIdPartition, epoch); +assertEquals(Optional.of(expectedOffset), offset); } // Search for non existing leader epoch -Optional highestOffsetForEpoch5 = remoteLogSegmentLifecycleManager.highestOffsetForEpoch(5); -Assertions.assertFalse(highestOffsetForEpoch5.isPresent()); -} finally { -Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager"); +Optional highestOffsetForEpoch5 = metadataManager.highestOffsetForEpoch(topicIdPartition, 5); +assertFalse(highestOffsetForEpoch5.isPresent()); } } -private RemoteLogSegmentMetadata createSegmentUpdateWithState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager, - Map segmentLeaderEpochs, - long startOffset, - long endOffset, - RemoteLogSegmentState state) -throws RemoteStorageException { +private RemoteLogSegmentMetadata upsertSegmentState(RemoteLogMetadataManager metadataManager, +Map segmentLeaderEpochs, +long startOffset, +long endOffset, +RemoteLogSegmentState state) +throws RemoteStorageException, ExecutionException, InterruptedException { RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); -RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, BROKER_ID_0, - time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); - remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata); - -RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), -Optional.empty(), -state, BROKER_ID_1); - remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate); - +RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, +-1L, brokerId0, time.milliseconds(), segSize, segmentLeaderEpochs); +metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get(); + verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata); + +RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, +time.milliseconds(), Optional.empty(), state, brokerId1); + metadataManager.updateRemoteLogSegmentMetadata(segMetadataUpdate).get(); + verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segMetadataUpdate); return segmentMetadata.createWithUpdates(segMetadataUpdate); } -private static class EpochOffset { -final int epoch; -final long offset; - -private EpochOffset(int epoch, -long offset) { -this.epoch = epoch; -this.offset = offset; -} - -@Override -public boolean equals(Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -EpochOffset that = (EpochOffset) o; -return epoch == that.epoch && offset == that.offset; -} - -@Override -public int hashCode() { -return Objects.hash(epoch, offset); -} - -@Override -
[jira] [Commented] (KAFKA-16879) SystemTime should use singleton mode
[ https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851864#comment-17851864 ] jiandu commented on KAFKA-16879: thank you > SystemTime should use singleton mode > > > Key: KAFKA-16879 > URL: https://issues.apache.org/jira/browse/KAFKA-16879 > Project: Kafka > Issue Type: Improvement >Reporter: jiandu >Assignee: jiandu >Priority: Minor > > Currently, the {{SystemTime}} class, which provides system time-related > functionalities such as getting the current timestamp 、sleep、and await can be > instantiated multiple times. > Howerver, system time is unique,In an application, the time obtained in > different places should be consistent, But now the time obtained by using > the Java System class to interact with the underlying layer is the same。 > So I suggest changing it to a singleton mode, reflect the uniqueness of > system time in design > > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1625229200 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -9863,8 +9863,8 @@ public void testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw // Newly joining member 3 bumps the group epoch. A new target assignment is computed. CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 1), -CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)), +CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), Review Comment: Oh yes it's exactly that! Thank you so much! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16879: SystemTime should use singleton mode [kafka]
gharris1727 commented on code in PR #16179: URL: https://github.com/apache/kafka/pull/16179#discussion_r1625228386 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -30,7 +30,7 @@ */ public interface Time { -Time SYSTEM = new SystemTime(); +Time SYSTEM = SystemTime.getInstance(); Review Comment: I would be inclined to keep the existing `Time.SYSTEM` and avoid the deprecation. SystemTime can then be package-local entirely. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16879) SystemTime should use singleton mode
[ https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851862#comment-17851862 ] 黃竣陽 edited comment on KAFKA-16879 at 6/4/24 1:37 AM: - Pardon me, I miss that you want to handle this issue by yourself, I will close my PR. I'm so sorry was (Author: JIRAUSER305187): sorry, I miss that you want to handle this issue by yourself, I will close my PR. I'm so sorry > SystemTime should use singleton mode > > > Key: KAFKA-16879 > URL: https://issues.apache.org/jira/browse/KAFKA-16879 > Project: Kafka > Issue Type: Improvement >Reporter: jiandu >Assignee: jiandu >Priority: Minor > > Currently, the {{SystemTime}} class, which provides system time-related > functionalities such as getting the current timestamp 、sleep、and await can be > instantiated multiple times. > Howerver, system time is unique,In an application, the time obtained in > different places should be consistent, But now the time obtained by using > the Java System class to interact with the underlying layer is the same。 > So I suggest changing it to a singleton mode, reflect the uniqueness of > system time in design > > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10787: Update spotless version and remove support JDK8 [kafka]
gongxuanzhang commented on code in PR #16176: URL: https://github.com/apache/kafka/pull/16176#discussion_r1625220935 ## build.gradle: ## @@ -799,7 +800,7 @@ subprojects { skipConfigurations = [ "zinc" ] } - if (project.name in spotlessApplyModules) { + if (JavaVersion.current().isJava11Compatible() && (project.name in spotlessApplyModules)) { apply plugin: 'com.diffplug.spotless' Review Comment: Yes, good idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Apply spotless to transaction-coordinator and server-common [kafka]
gongxuanzhang commented on PR #16172: URL: https://github.com/apache/kafka/pull/16172#issuecomment-2146397207 @chia7712 complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16879) SystemTime should use singleton mode
[ https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851862#comment-17851862 ] 黃竣陽 commented on KAFKA-16879: - sorry, I miss that you want to handle this issue by yourself, I will close my PR. I'm so sorry > SystemTime should use singleton mode > > > Key: KAFKA-16879 > URL: https://issues.apache.org/jira/browse/KAFKA-16879 > Project: Kafka > Issue Type: Improvement >Reporter: jiandu >Assignee: jiandu >Priority: Minor > > Currently, the {{SystemTime}} class, which provides system time-related > functionalities such as getting the current timestamp 、sleep、and await can be > instantiated multiple times. > Howerver, system time is unique,In an application, the time obtained in > different places should be consistent, But now the time obtained by using > the Java System class to interact with the underlying layer is the same。 > So I suggest changing it to a singleton mode, reflect the uniqueness of > system time in design > > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16879: SystemTime should use singleton mode [kafka]
m1a2st closed pull request #16179: KAFKA-16879: SystemTime should use singleton mode URL: https://github.com/apache/kafka/pull/16179 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16879: SystemTime should use singleton mode [kafka]
m1a2st commented on code in PR #16179: URL: https://github.com/apache/kafka/pull/16179#discussion_r1625217653 ## clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java: ## @@ -26,6 +26,18 @@ */ public class SystemTime implements Time { +/** + * use the static inner class to implement the singleton pattern + * to avoid the overhead of synchronization Review Comment: True, `SystemTime` is very cheap, but I think that don't use double-checked locking to implement the singleton pattern, maybe I should remove this comment is more better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16879: SystemTime should use singleton mode [kafka]
m1a2st commented on code in PR #16179: URL: https://github.com/apache/kafka/pull/16179#discussion_r1625215483 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -30,7 +30,7 @@ */ public interface Time { -Time SYSTEM = new SystemTime(); +Time SYSTEM = SystemTime.getInstance(); Review Comment: You are right, I should deprecate `Time SYSTEM` this property, and use `SystemTime.getInstance()` to instead it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]
junrao commented on code in PR #15993: URL: https://github.com/apache/kafka/pull/15993#discussion_r1625209745 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -305,22 +341,23 @@ public Map.Entry endOffsetFor(int requestedEpoch, long logEndOffs /** * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset. + * + * Checkpoint-flushing is done asynchronously. */ public void truncateFromEnd(long endOffset) { lock.writeLock().lock(); try { -Optional epochEntry = latestEntry(); -if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { -List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); - -// We intentionally don't force flushing change to the device here because: +List removedEntries = truncateFromEnd(epochs, endOffset); +if (!removedEntries.isEmpty()) { +// We flush the change to the device in the background because: // - To avoid fsync latency // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives // * This method is called by ReplicaFetcher threads, which could block replica fetching // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. -// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by -// another truncateFromEnd call on log loading procedure so it won't be a problem -writeToFile(false); +// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. +// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by +// another truncateFromEnd call on log loading procedure, so it won't be a problem +scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); Review Comment: Oh, I didn't see the partition after the prefix. So, this is fine then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16879: SystemTime should use singleton mode [kafka]
gharris1727 commented on code in PR #16179: URL: https://github.com/apache/kafka/pull/16179#discussion_r1625208358 ## clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java: ## @@ -26,6 +26,18 @@ */ public class SystemTime implements Time { +/** + * use the static inner class to implement the singleton pattern + * to avoid the overhead of synchronization Review Comment: Can you explain this comment? I don't know what it's referring to, as none of this code is synchronized. The static inner class seems to be a lazy initialization trick, which is ineffective here (due to Time.SYSTEM) and unneeded (since the SystemTime constructor is very cheap). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16879) SystemTime should use singleton mode
[ https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851860#comment-17851860 ] jiandu edited comment on KAFKA-16879 at 6/4/24 1:07 AM: [~m1a2st] Sorry, you should choose to handle issues that have no assigned handlers or have not been handled for a long time. I need to handle this issue myself. was (Author: JIRAUSER305334): Sorry, you should choose to handle issues that have no assigned handlers or have not been handled for a long time. I need to handle this issue myself. > SystemTime should use singleton mode > > > Key: KAFKA-16879 > URL: https://issues.apache.org/jira/browse/KAFKA-16879 > Project: Kafka > Issue Type: Improvement >Reporter: jiandu >Assignee: jiandu >Priority: Minor > > Currently, the {{SystemTime}} class, which provides system time-related > functionalities such as getting the current timestamp 、sleep、and await can be > instantiated multiple times. > Howerver, system time is unique,In an application, the time obtained in > different places should be consistent, But now the time obtained by using > the Java System class to interact with the underlying layer is the same。 > So I suggest changing it to a singleton mode, reflect the uniqueness of > system time in design > > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16879) SystemTime should use singleton mode
[ https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851860#comment-17851860 ] jiandu commented on KAFKA-16879: Sorry, you should choose to handle issues that have no assigned handlers or have not been handled for a long time. I need to handle this issue myself. > SystemTime should use singleton mode > > > Key: KAFKA-16879 > URL: https://issues.apache.org/jira/browse/KAFKA-16879 > Project: Kafka > Issue Type: Improvement >Reporter: jiandu >Assignee: jiandu >Priority: Minor > > Currently, the {{SystemTime}} class, which provides system time-related > functionalities such as getting the current timestamp 、sleep、and await can be > instantiated multiple times. > Howerver, system time is unique,In an application, the time obtained in > different places should be consistent, But now the time obtained by using > the Java System class to interact with the underlying layer is the same。 > So I suggest changing it to a singleton mode, reflect the uniqueness of > system time in design > > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16879: SystemTime should use singleton mode [kafka]
gharris1727 commented on code in PR #16179: URL: https://github.com/apache/kafka/pull/16179#discussion_r1625205338 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -30,7 +30,7 @@ */ public interface Time { -Time SYSTEM = new SystemTime(); +Time SYSTEM = SystemTime.getInstance(); Review Comment: Hey, is there a reason this isn't the canonical singleton instance? Why would we want two different ways to get the singleton `Time.SYSTEM` and `SystemTime.getInstance()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]
ocadaruma commented on code in PR #15993: URL: https://github.com/apache/kafka/pull/15993#discussion_r1625200232 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -305,22 +341,23 @@ public Map.Entry endOffsetFor(int requestedEpoch, long logEndOffs /** * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset. + * + * Checkpoint-flushing is done asynchronously. */ public void truncateFromEnd(long endOffset) { lock.writeLock().lock(); try { -Optional epochEntry = latestEntry(); -if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { -List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); - -// We intentionally don't force flushing change to the device here because: +List removedEntries = truncateFromEnd(epochs, endOffset); +if (!removedEntries.isEmpty()) { +// We flush the change to the device in the background because: // - To avoid fsync latency // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives // * This method is called by ReplicaFetcher threads, which could block replica fetching // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. -// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by -// another truncateFromEnd call on log loading procedure so it won't be a problem -writeToFile(false); +// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. +// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by +// another truncateFromEnd call on log loading procedure, so it won't be a problem +scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); Review Comment: Not strictly necessary. I just followed some of existing scheduled tasks's convention (e.g. https://github.com/apache/kafka/blob/c24f94936d4370b2a3f3bfad42c56198208079b4/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L571) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]
ocadaruma commented on code in PR #15993: URL: https://github.com/apache/kafka/pull/15993#discussion_r1625200232 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -305,22 +341,23 @@ public Map.Entry endOffsetFor(int requestedEpoch, long logEndOffs /** * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset. + * + * Checkpoint-flushing is done asynchronously. */ public void truncateFromEnd(long endOffset) { lock.writeLock().lock(); try { -Optional epochEntry = latestEntry(); -if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { -List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); - -// We intentionally don't force flushing change to the device here because: +List removedEntries = truncateFromEnd(epochs, endOffset); +if (!removedEntries.isEmpty()) { +// We flush the change to the device in the background because: // - To avoid fsync latency // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives // * This method is called by ReplicaFetcher threads, which could block replica fetching // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. -// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by -// another truncateFromEnd call on log loading procedure so it won't be a problem -writeToFile(false); +// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. +// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by +// another truncateFromEnd call on log loading procedure, so it won't be a problem +scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); Review Comment: Not strictly necessary but I just followed some of existing scheduled tasks's convention (e.g. https://github.com/apache/kafka/blob/c24f94936d4370b2a3f3bfad42c56198208079b4/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L571) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]
ocadaruma commented on code in PR #15993: URL: https://github.com/apache/kafka/pull/15993#discussion_r1625196957 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -608,25 +613,23 @@ public boolean isCancelled() { } /** - * Returns the leader epoch checkpoint by truncating with the given start[exclusive] and end[inclusive] offset + * Returns the leader epoch entries within the range of the given start[exclusive] and end[inclusive] offset. + * + * Visible for testing. * * @param log The actual log from where to take the leader-epoch checkpoint - * @param startOffset The start offset of the checkpoint file (exclusive in the truncation). + * @param startOffset The start offset of the epoch entries (exclusive). Review Comment: My bad, that's right -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16047: Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers [kafka]
edoardocomar commented on PR #16151: URL: https://github.com/apache/kafka/pull/16151#issuecomment-2146353225 > If someone wants to raise the timeout for this one operation, I don't think that we should require them to increase the client-global request.timeout.ms. I agree to that. Hopefully the commit addresses your concerns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Apply spotless to transaction-coordinator and server-common [kafka]
chia7712 commented on PR #16172: URL: https://github.com/apache/kafka/pull/16172#issuecomment-2146344557 @gongxuanzhang please fix the conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]
junrao commented on code in PR #15993: URL: https://github.com/apache/kafka/pull/15993#discussion_r1625162518 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -42,10 +45,15 @@ * * Leader Epoch = epoch assigned to each leader by the controller. * Offset = offset of the first message in each epoch. + * + * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flush the epoch-entry changes to checkpoint asynchronously. Review Comment: Perhaps name truncateFromStart and truncateFromEnd to sth like truncateFromStartAsyncFlush and truncateFromEndAsyncFlush to make it clear? ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -386,11 +442,39 @@ public OptionalInt epochForOffset(long offset) { } } -public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint leaderEpochCheckpoint) { +/** + * Returns a new LeaderEpochFileCache which contains same + * epoch entries with replacing backing checkpoint file. + * @param leaderEpochCheckpoint the new checkpoint file + * @return a new LeaderEpochFileCache instance + */ +public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpointFile leaderEpochCheckpoint) { +lock.readLock().lock(); +try { +return new LeaderEpochFileCache(epochEntries(), +topicPartition, +leaderEpochCheckpoint, +scheduler); +} finally { +lock.readLock().unlock(); +} +} + +/** + * Returns the leader epoch entries within the range of the given start[exclusive] and end[inclusive] offset + * @param startOffset The start offset of the epoch entries (exclusive). Review Comment: From the caller's perspective, start offset is inclusive and end offset in exclusive. ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -305,22 +341,23 @@ public Map.Entry endOffsetFor(int requestedEpoch, long logEndOffs /** * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset. + * + * Checkpoint-flushing is done asynchronously. */ public void truncateFromEnd(long endOffset) { lock.writeLock().lock(); try { -Optional epochEntry = latestEntry(); -if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { -List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); - -// We intentionally don't force flushing change to the device here because: +List removedEntries = truncateFromEnd(epochs, endOffset); +if (!removedEntries.isEmpty()) { +// We flush the change to the device in the background because: // - To avoid fsync latency // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives // * This method is called by ReplicaFetcher threads, which could block replica fetching // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. -// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by -// another truncateFromEnd call on log loading procedure so it won't be a problem -writeToFile(false); +// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. +// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by +// another truncateFromEnd call on log loading procedure, so it won't be a problem +scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); Review Comment: Hmm, why do we need to add a trailing - in "leader-epoch-cache-flush-"? ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -128,15 +162,17 @@ private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { } } -private List removeFromEnd(Predicate predicate) { +private static List removeFromEnd( Review Comment: Could we fold this method into the caller since it only has 1 line? Ditto for `removeFromStart`. ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -608,25 +613,23 @@ public boolean isCancelled() { } /** - * Returns the leader epoch checkpoint by truncating with the given start[exclusive]
[PR] KAFKA-16740: Adding skeleton code for Share Fetch and Acknowledge RPC [kafka]
apoorvmittal10 opened a new pull request, #16184: URL: https://github.com/apache/kafka/pull/16184 The PR adds skeleton code for Share Fetch and Acknowledge RPCs. The changes include: 1. Defining RPCs in KafkaApis.scala 2. Added new SharePartitionManager class which handles the RPCs handling 3. Added SharePartition class which manages in-memory record states and for fetched data. Note: The new classes are in `core` module itself as has dependency on core classes like `ReplicaManager` etc. hence cannot move the classes in `server` module. Subsequent PRs will follow implementation and tests cases for the functionality. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2146294482 @lianetm @cadonna—this is ready for another review. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16192: Introduce usage of flexible records to coordinators [kafka]
jolshan opened a new pull request, #16183: URL: https://github.com/apache/kafka/pull/16183 This change includes adding transaction.version (part of KIP-1022) New transaction version 1 is introduced to support writing flexible fields in transaction state log messages. Transaction version 2 is created in anticipation for further KIP-890 changes. Neither are made production ready. Tests for the new transaction version and new MV are created. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]
chia7712 commented on PR #16169: URL: https://github.com/apache/kafka/pull/16169#issuecomment-2146219949 > Did we mean to totally remove the null check here? Pardon me, the topic is related to "update" of cipherInformation, and this PR has fixed that. If you prefer to either keep checks or have more checks, I can file a PR to add consistent/explicit checks to both testing and production code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16858: Throw DataException from validateValue on array and map schemas without inner schemas [kafka]
gharris1727 commented on PR #16161: URL: https://github.com/apache/kafka/pull/16161#issuecomment-2146189325 Hey @C0urante @yashmayya Could either of you PTAL? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625068181 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -270,30 +326,27 @@ void testPollResultTimer() { void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); + + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); + consumerNetworkThread.runOnce(); // After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); } @Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} +void testEnsureEventsAreCompleted() { Review Comment: Not introduced by this PR but since we're improving here, I find that this test does not bring any value, because it's testing something that it's not the responsibility of the `ConsumerNetworkThread`, so we end up testing something we're mocking ourselves with the `doAnswer`. What do you think? I had suggested to remove it completely. See my original comment about it on this other [PR](https://github.com/apache/kafka/pull/15640/files#r1608897877). If you agree I would say we remove it instead of keep having to maintain it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException
[ https://issues.apache.org/jira/browse/KAFKA-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-16105. - Resolution: Fixed > Reassignment of tiered topics is failing due to RemoteStorageException > -- > > Key: KAFKA-16105 > URL: https://issues.apache.org/jira/browse/KAFKA-16105 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Reporter: Anatolii Popov >Priority: Critical > Labels: tiered-storage > Fix For: 3.8.0, 3.7.1, 3.9.0 > > > When partition reassignment is happening for a tiered topic in most of the > cases it's stuck with RemoteStorageException's on follower nodes saying that > it can not construct remote log auxilary state: > > {code:java} > [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, > fetcherId=2] Error building remote log auxiliary state for test-24 > (kafka.server.ReplicaFetcherThread) > > org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't > build the state from remote store for partition: test-24, currentLeaderEpoch: > 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the > previous remote log segment metadata was not found > at > kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259) > at > kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106) > at > kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413) > at > scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331) > at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403) > at > scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129) > at > scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130) > {code} > > Scenario: > A cluster of 3 nodes with a single topic with 30 partitions. All partitions > have tiered segments. > Adding 3 more nodes to the cluster and making a reassignment to move all the > data to new nodes. > Behavior: > For most of the partitions reassignment is happening smoothly. > For some of the partitions when a new node starts to get assignments it reads > __remote_log_metadata topic and tries to initialize the metadata cache on > records with COPY_SEGMENT_STARTED. If it's reading such a message for the > partition before the partition was assigned to this specific node it ignores > the message, so skips the cache initialization and marks the partition as > assigned. So reassignment is stuck since
Re: [PR] KAFKA-16105: Reset read offsets when seeking to beginning in TBRLMM [kafka]
gharris1727 commented on PR #15165: URL: https://github.com/apache/kafka/pull/15165#issuecomment-2146174509 Thank you @AnatolyPopov for the PR, and thanks @showuon and @kamalcph for your reviews! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625068181 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -270,30 +326,27 @@ void testPollResultTimer() { void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); + + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); + consumerNetworkThread.runOnce(); // After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); } @Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} +void testEnsureEventsAreCompleted() { Review Comment: Not introduced by this PR but since we're improving here, I find that this test does not bring any value, because it's testing something that it's not the responsibility of the `ConsumerNetworkThread`, so we end up testing something we're mocking ourselves with the `doAnswer`. What do you think? I had suggested to remove it completely. See my original comment about it on this other [PR](https://github.com/apache/kafka/pull/15640/files#r1608897877). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16530: Fix high-watermark calculation to not assume the leader is in the voter set [kafka]
jsancio commented on code in PR #16079: URL: https://github.com/apache/kafka/pull/16079#discussion_r1625072194 ## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ## @@ -272,20 +280,77 @@ public void testUpdateHighWatermarkQuorumSizeThree() { assertEquals(Optional.empty(), state.highWatermark()); assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); -assertFalse(state.updateLocalState(new LogOffsetMetadata(20L))); +assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), voterSet)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); } +@Test +public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() { +int node1 = 1; +int node2 = 2; +Set originalVoterSet = mkSet(localId, node1, node2); +LeaderState state = newLeaderState(originalVoterSet, 10L); +assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); +assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L))); +assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); +assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + +// removing node1 should not decrement HW to 10L +Set voterSetWithoutNode1 = mkSet(localId, node2); +assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), voterSetWithoutNode1)); +assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + +// HW cannot change until after node2 catches up to last HW +assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(14L))); +assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); +assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); +assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + +// HW should update to 16L +assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L))); +assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); +} + +@Test +public void testUpdateHighWatermarkQuorumRemovingLeaderFromVoterStates() { +int node1 = 1; +int node2 = 2; +Set originalVoterSet = mkSet(localId, node1, node2); +LeaderState state = newLeaderState(originalVoterSet, 10L); +assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); +assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L))); +assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); +assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + +// removing leader should not decrement HW to 10L +Set voterSetWithoutLeader = mkSet(node1, node2); +assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), voterSetWithoutLeader)); +assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + +// HW cannot change until node2 catches up to last HW Review Comment: Before checking this, let's increase node1's LEO to 16 and show that it doesn't increase the HWM. ## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ## @@ -272,20 +280,77 @@ public void testUpdateHighWatermarkQuorumSizeThree() { assertEquals(Optional.empty(), state.highWatermark()); assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); -assertFalse(state.updateLocalState(new LogOffsetMetadata(20L))); +assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), voterSet)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); } +@Test +public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() { Review Comment: Are you planning to add a test for adding a follower to the voter set? -- This is
Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]
cmccabe commented on code in PR #16058: URL: https://github.com/apache/kafka/pull/16058#discussion_r1625074887 ## core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala: ## @@ -306,6 +309,113 @@ class RaftManagerTest { } + class ReconfigurationTestContext( Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2146155510 Hey @brenden20, very nice improvement! Left a few comments. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625068181 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -270,30 +326,27 @@ void testPollResultTimer() { void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); + + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); + consumerNetworkThread.runOnce(); // After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); } @Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} +void testEnsureEventsAreCompleted() { Review Comment: Not introduced by this PR but since we're improving here, I find that this test does not bring any value, because it's testing something that it's not the responsibility of the `ConsumerNetworkThread`, so in the end we end up testing something we're mocking ourselves with the `doAnswer`. What do you think? I had suggested to remove it completely. See my original comment about it on this other [PR](https://github.com/apache/kafka/pull/15640/files#r1608897877). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException
[ https://issues.apache.org/jira/browse/KAFKA-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16105: Fix Version/s: 3.8.0 3.7.1 3.9.0 > Reassignment of tiered topics is failing due to RemoteStorageException > -- > > Key: KAFKA-16105 > URL: https://issues.apache.org/jira/browse/KAFKA-16105 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Reporter: Anatolii Popov >Priority: Critical > Labels: tiered-storage > Fix For: 3.8.0, 3.7.1, 3.9.0 > > > When partition reassignment is happening for a tiered topic in most of the > cases it's stuck with RemoteStorageException's on follower nodes saying that > it can not construct remote log auxilary state: > > {code:java} > [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, > fetcherId=2] Error building remote log auxiliary state for test-24 > (kafka.server.ReplicaFetcherThread) > > org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't > build the state from remote store for partition: test-24, currentLeaderEpoch: > 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the > previous remote log segment metadata was not found > at > kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259) > at > kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106) > at > kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413) > at > scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331) > at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403) > at > scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129) > at > scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130) > {code} > > Scenario: > A cluster of 3 nodes with a single topic with 30 partitions. All partitions > have tiered segments. > Adding 3 more nodes to the cluster and making a reassignment to move all the > data to new nodes. > Behavior: > For most of the partitions reassignment is happening smoothly. > For some of the partitions when a new node starts to get assignments it reads > __remote_log_metadata topic and tries to initialize the metadata cache on > records with COPY_SEGMENT_STARTED. If it's reading such a message for the > partition before the partition was assigned to this specific node it ignores > the message, so skips the cache initialization and marks the partition as >
Re: [PR] KAFKA-16525; Dynamic KRaft network manager and channel [kafka]
cmccabe merged PR #15986: URL: https://github.com/apache/kafka/pull/15986 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16362) Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide
[ https://issues.apache.org/jira/browse/KAFKA-16362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-16362. - Fix Version/s: 3.8.0 Resolution: Fixed > Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide > > > Key: KAFKA-16362 > URL: https://issues.apache.org/jira/browse/KAFKA-16362 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 3.7.0 >Reporter: Greg Harris >Assignee: Ramin Gharib >Priority: Trivial > Labels: newbie++ > Fix For: 3.8.0 > > > The implementation of KStreamKStreamJoin has several places that the compiler > emits warnings for, that are later suppressed or ignored: > * LeftOrRightValue.make returns a raw LeftOrRightValue without generic > arguments, because the generic type arguments depend on the boolean input. > * Calling LeftOrRightValue includes an unchecked cast before inserting the > record into the outerJoinStore > * emitNonJoinedOuterRecords swaps the left and right values, and performs an > unchecked cast > These seem to be closely related to the isLeftSide variable, which makes the > class behave differently whether it is present on the left or right side of a > join. > We should figure out if these warnings can be eliminated by a refactor, > perhaps into KStreamKstreamJoin.Left and KStreamKStreamJoin.Right, or with > some generic arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625060556 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -270,30 +326,27 @@ void testPollResultTimer() { void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); + + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); + consumerNetworkThread.runOnce(); // After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); } @Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} +void testEnsureEventsAreCompleted() { +Cluster cluster = mock(Cluster.class); +when(metadata.fetch()).thenReturn(cluster); -@Test -void testEnsureMetadataUpdateOnPoll() { -MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); -client.prepareMetadataUpdate(metadataResponse); -metadata.requestUpdate(false); -consumerNetworkThread.runOnce(); -verify(metadata, times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), anyLong()); -} +List list = new ArrayList<>(); +list.add(new Node(0, "host", 0)); +when(cluster.nodes()).thenReturn(list); Review Comment: what about simplifying to a single `when(cluster.nodes()).thenReturn(Collections.singletonList(new Node(0, "host", 0)));` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625059147 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -289,10 +289,11 @@ private void closeInternal(final Duration timeout) { } } +// Add test to see if poll() is run once with timer of 0 Review Comment: is this something you intend to do on this PR or separately? Either way, I would suggest we remove the comment and address it in this PR or have a jira to follow-up separately if needed please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16792: -- Description: Enable the following unit tests for the new async consumer in KafkaConsumerTest: - testCurrentLag - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions - testPollReturnsRecords - testResetToCommittedOffset - testResetUsingAutoResetPolicy was: Enable the following unit tests for the new async consumer in KafkaConsumerTest: - testFetchStableOffsetThrowInPoll - testCurrentLag - testListOffsetShouldUpdateSubscriptions - testResetToCommittedOffset - testResetUsingAutoResetPolicy - testPollReturnsRecords > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testCurrentLag > - testFetchStableOffsetThrowInPoll > - testListOffsetShouldUpdateSubscriptions > - testPollReturnsRecords > - testResetToCommittedOffset > - testResetUsingAutoResetPolicy > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625054713 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -329,6 +382,8 @@ void testEnsureEventsAreCompleted() { @Test void testCleanupInvokesReaper() { +LinkedList queue = new LinkedList<>(); +when(networkClientDelegate.unsentRequests()).thenReturn(queue); Review Comment: totally needed, but heads-up, there is another [PR](https://github.com/apache/kafka/pull/16156) in-flight where we're trying to change the check behind this, so we'll need to update this expectation here if that goes in first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850215#comment-17850215 ] Kirk True edited comment on KAFKA-16792 at 6/3/24 9:09 PM: --- The following don't work for reasons other than the timeout: - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}} - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}} - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs was (Author: kirktrue): These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) The following still don't work: - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}} - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}} - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs - testResetToCommittedOffset > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testFetchStableOffsetThrowInPoll > - testCurrentLag > - testListOffsetShouldUpdateSubscriptions > - testResetToCommittedOffset > - testResetUsingAutoResetPolicy > - testPollReturnsRecords > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]
jsancio commented on code in PR #16058: URL: https://github.com/apache/kafka/pull/16058#discussion_r1625049639 ## clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java: ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Locale; +import java.util.Objects; + +/** + * An endpoint for a raft quorum voter. + */ +@InterfaceStability.Stable +public class RaftVoterEndpoint { +private final String name; +private final String host; +private final int port; +private final String securityProtocol; Review Comment: > Is the expectation that the security protocol is found by looking up the endpoint name in `listener.security.protocol.map`? What happens if it's not there? Do we fail or fall back to `PLAINTEXT`? It will use whatever is returned by `KafkaConfig.effectiveListenerSecurityProtocolMap`. Yes, it does look like that code currently returns PLAINTEXT if there is not security mapping for the controller listener. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850215#comment-17850215 ] Kirk True edited comment on KAFKA-16792 at 6/3/24 9:03 PM: --- These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) The following still don't work: - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}} - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}} - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs - testResetToCommittedOffset was (Author: kirktrue): These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) The following still don't work: - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}} - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs - testResetToCommittedOffset > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testFetchStableOffsetThrowInPoll > - testCurrentLag > - testListOffsetShouldUpdateSubscriptions > - testResetToCommittedOffset > - testResetUsingAutoResetPolicy > - testPollReturnsRecords > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625048146 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +void testRequestManagersArePolledOnce() { consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; +verify(networkClientDelegate).poll(anyLong(), anyLong()); } @Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); +public void testApplicationEvent() { Review Comment: Not introduced by this PR, but reviewing this one I noticed that we have lots of similar tests for checking that runOnce processes the events (ex. testResetPositionsEventIsProcessed, testSyncCommitEvent, testAsyncCommitEvent...), all doing the same (add event, run once, check is processed). Could we parametrize this and have a single test maybe? (same shape as this one but receiving the event as param). Should allow us to remove lots that, in the end this component does not care about the specific events, it's just responsible for processing whatever event is added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]
jsancio commented on code in PR #16058: URL: https://github.com/apache/kafka/pull/16058#discussion_r1625041690 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -318,4 +329,28 @@ class KafkaRaftManager[T]( override def voterNode(id: Int, listener: String): Option[Node] = { client.voterNode(id, listener).toScala } + + def verifyCommonFields( +providedClusterId: String, + ): Unit = { +if (!providedClusterId.equals(clusterId)) { + throw new InconsistentClusterIdException("Provided cluster ID " + providedClusterId + Review Comment: > Hmm, your URL doesn't work for me. This is the URL/code I was trying to send: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ControllerApis.scala#L147-L151 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
ableegoldman commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1625023687 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -527,35 +531,44 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe + "tasks for source topics vs changelog topics."); } -final Set logicalTaskIds = unmodifiableSet(sourcePartitionsForTask.keySet()); -final Set allTopicPartitions = new HashSet<>(); +final Set topicsRequiringRackInfo = new HashSet<>(); +final AtomicBoolean rackInformationFetched = new AtomicBoolean(false); +final Runnable fetchRackInformation = () -> { +if (!rackInformationFetched.get()) { +RackUtils.annotateTopicPartitionsWithRackInfo(cluster, Review Comment: very small nit (tack onto any followup PR): weird line break, either keep everything on one line or move the `cluster` variable to the 2nd line with the other params ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -527,35 +531,44 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe + "tasks for source topics vs changelog topics."); } -final Set logicalTaskIds = unmodifiableSet(sourcePartitionsForTask.keySet()); -final Set allTopicPartitions = new HashSet<>(); +final Set topicsRequiringRackInfo = new HashSet<>(); +final AtomicBoolean rackInformationFetched = new AtomicBoolean(false); +final Runnable fetchRackInformation = () -> { +if (!rackInformationFetched.get()) { +RackUtils.annotateTopicPartitionsWithRackInfo(cluster, +internalTopicManager, topicsRequiringRackInfo); +rackInformationFetched.set(true); +} +}; + final Map> topicPartitionsForTask = new HashMap<>(); +final Set logicalTaskIds = unmodifiableSet(sourcePartitionsForTask.keySet()); logicalTaskIds.forEach(taskId -> { final Set topicPartitions = new HashSet<>(); for (final TopicPartition topicPartition : sourcePartitionsForTask.get(taskId)) { final boolean isSource = true; final boolean isChangelog = changelogPartitionsForTask.get(taskId).contains(topicPartition); final DefaultTaskTopicPartition racklessTopicPartition = new DefaultTaskTopicPartition( -topicPartition, isSource, isChangelog, null); -allTopicPartitions.add(racklessTopicPartition); +topicPartition, isSource, isChangelog, fetchRackInformation); +topicsRequiringRackInfo.add(racklessTopicPartition); topicPartitions.add(racklessTopicPartition); } for (final TopicPartition topicPartition : changelogPartitionsForTask.get(taskId)) { final boolean isSource = sourcePartitionsForTask.get(taskId).contains(topicPartition); final boolean isChangelog = true; final DefaultTaskTopicPartition racklessTopicPartition = new DefaultTaskTopicPartition( -topicPartition, isSource, isChangelog, null); -allTopicPartitions.add(racklessTopicPartition); +topicPartition, isSource, isChangelog, fetchRackInformation); +if (publicAssignmentConfigs.numStandbyReplicas() > 0) { Review Comment: Note that active tasks will also read from changelog topics (though only during the restore phase), so we should be adding changelogs to the `topicsRequiringRackInfo` set even if there are no standbys configured Again you can tack this onto PR #17 or whatever PR is next ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -573,14 +586,24 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe )); return new DefaultApplicationState( -assignmentConfigs.toPublicAssignmentConfigs(), +publicAssignmentConfigs, logicalTasks, clientMetadataMap ); } -private static void processStreamsPartitionAssignment(final Map clientMetadataMap, - final TaskAssignment taskAssignment) { +private void processStreamsPartitionAssignment(final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor, + final TaskAssignment taskAssignment, + final AssignmentError assignmentError, +
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625034602 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +void testRequestManagersArePolledOnce() { consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; +verify(networkClientDelegate).poll(anyLong(), anyLong()); Review Comment: This test seems to leave out one of the core actions of the `runOnce`: 1. poll managers (this generates the requests) -> covered by the test 2. add newly generated requests are added to the network client -> not covered 3. poll network client to send the requests -> covered by the test Step 3 wouldn't have the effect we want (send the request) if step 2 does not happen, so what about adding a `verify(networkClientDelegate).addAll(..` before verifying the network client poll, to make sure we cover the whole flow of how requests move from the managers to the network layer on run once? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625032096 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +void testRequestManagersArePolledOnce() { consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; Review Comment: I think we're missing the expectations for the requestManagers.entries() to make sure there are some managers and we actually verify something, so we need `when(requestManagers.entries()).thenReturn(..` with some managers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16105: Reset read offsets when seeking to beginning in TBRLMM [kafka]
gharris1727 merged PR #15165: URL: https://github.com/apache/kafka/pull/15165 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16884) Refactor RemoteLogManagerConfig with a Builder
[ https://issues.apache.org/jira/browse/KAFKA-16884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851809#comment-17851809 ] Muralidhar Basani commented on KAFKA-16884: --- [~gharris1727] agree with you about aligning it with other Config classes. Like suggested, we aim to make the below changes : * Extend AbstractConfig instead of as an argument * Avoid use of mutable config def > Refactor RemoteLogManagerConfig with a Builder > -- > > Key: KAFKA-16884 > URL: https://issues.apache.org/jira/browse/KAFKA-16884 > Project: Kafka > Issue Type: Improvement >Reporter: Muralidhar Basani >Assignee: Muralidhar Basani >Priority: Minor > > The {{RemoteLogManagerConfig}} class has a very large constructor with 27 > fields, making it difficult to read and work with. Introducing a builder > class would be beneficial. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors, and compare raw task configs before publishing them [kafka]
C0urante commented on PR #16122: URL: https://github.com/apache/kafka/pull/16122#issuecomment-2146088065 Thanks @mimaison and apologies for the failing test. This should be ready now! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]
C0urante commented on PR #16053: URL: https://github.com/apache/kafka/pull/16053#issuecomment-2146086988 Thanks @gharris1727! I've updated the patch to apply to both distributed and standalone mode, and fixed failing unit tests. I've also tweaked the logic so that applied connector configs are tracked regardless of whether the worker has completed startup or not, which should eliminate unnecessary cluster churn on upgrades. This still "primes the pump" for future connector config updates though, which I believe should make us both very happy! Going through the scenarios you listed in your analysis (which was correct based on the implementation you reviewed): - The first one (steps 1-3) should never occur now. If a connector has generated task configs after its latest configuration was submitted by the user, the worker should have an applied config for it; if the connector has not generated task configs, then we should force a reconfiguration attempt, just like we would today. - The second one (steps 4-6) should remain correct ("Rebalancing a connector among existing workers in a cluster won't force new task configs to be generated"), because workers must be caught up on the config topic before joining a group, and before resuming work after rejoining a group - The third one (steps 7-8) should remain as correct as it was in the original implementation. We eagerly transform applied connector configs as soon as the corresponding set of task configs is generated, which does mean that there may be some lag time between when the connector config was submitted and when we performed the transform. Ultimately though, there is an eventual consistency-esque guarantee that if a config provider changes periodically, new task configs will at some point be forcibly written to the config topic. - The fourth one (steps 9-11) should remain correct as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]
jolshan commented on PR #16169: URL: https://github.com/apache/kafka/pull/16169#issuecomment-2146083639 Did we mean to totally remove the null check here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16105: Reassignment fix [kafka]
gharris1727 commented on PR #15165: URL: https://github.com/apache/kafka/pull/15165#issuecomment-2146082430 Test failures appear unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16884) Refactor RemoteLogManagerConfig with a Builder
[ https://issues.apache.org/jira/browse/KAFKA-16884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851807#comment-17851807 ] Greg Harris commented on KAFKA-16884: - I don't think the builder pattern is a good fit for configuration classes like this one, it would be a deviation in form from all other config classes. I see two odd things about the RLMC that may contribute to this: it doesn't extend AbstractConfig, it instead accepts one as an argument. It exposes it's (mutable!) CONFIG_DEF to be defined directly in the KafkaConfig. I have not seen that before, it's very unusual, and mutable ConfigDefs have been phased out in Connect. If the large constructor is truly a problem (which i'm not convinced of, as there are only 2 call-sites for it right now) then I think it would be reasonable to solve it by bringing the whole class into alignment with other Config classes, rather than switching to the builder pattern. > Refactor RemoteLogManagerConfig with a Builder > -- > > Key: KAFKA-16884 > URL: https://issues.apache.org/jira/browse/KAFKA-16884 > Project: Kafka > Issue Type: Improvement >Reporter: Muralidhar Basani >Assignee: Muralidhar Basani >Priority: Minor > > The {{RemoteLogManagerConfig}} class has a very large constructor with 27 > fields, making it difficult to read and work with. Introducing a builder > class would be beneficial. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]
cmccabe commented on code in PR #16058: URL: https://github.com/apache/kafka/pull/16058#discussion_r1625013334 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -1080,4 +1083,39 @@ class ControllerApis( requestThrottleMs => new AssignReplicasToDirsResponse(reply.setThrottleTimeMs(requestThrottleMs))) } } + + def handleAddRaftVoter(request: RequestChannel.Request): CompletableFuture[Unit] = { +authHelper.authorizeClusterOperation(request, ALTER) +val addRequest = request.body[AddRaftVoterRequest] +raftManager.handleAddVoter(addRequest.data()) +requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + new AddRaftVoterResponse(new AddRaftVoterResponseData(). +setThrottleTimeMs(requestThrottleMs). +setErrorCode(0.toShort). +setErrorMessage(null))) +CompletableFuture.completedFuture[Unit](()) Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]
cmccabe commented on code in PR #16058: URL: https://github.com/apache/kafka/pull/16058#discussion_r1625013008 ## clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java: ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Locale; +import java.util.Objects; + +/** + * An endpoint for a raft quorum voter. + */ +@InterfaceStability.Stable +public class RaftVoterEndpoint { +private final String name; +private final String host; +private final int port; +private final String securityProtocol; Review Comment: > Did you consider using o.a.k.c.Endpoints? This struct has a similar structure to Endpoints. `org.apache.kafka.common.Endpoint` is not intended to be used as part of the Admin Client API. That class drags in the non-public class `org.apache.kafka.common.security.auth.SecurityProtocol`. Additionally we may want to change it in the future, which we could not do if it were part of the admin API. (`org.apache.kafka.common.Endpoint` did get dragged into the Authorizer API, which I think was a mistake, but that's a separate discussion. And not many people have ever written Authorizers, so changing that stuff is not impossible) > During the KIP discussion we agreed to not add security protocol to the AddRaftVoter request. Mainly because it is not needed and that there is a requirement that the listener name, security protocol and the protocol configuration must match in all of the voters if they exist. Is the expectation that the security protocol is found by looking up the endpoint name in `listener.security.protocol.map`? What happens if it's not there? Do we fail or fall back to `PLAINTEXT`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]
cmccabe commented on code in PR #16058: URL: https://github.com/apache/kafka/pull/16058#discussion_r1625003964 ## core/src/test/resources/log4j.properties: ## @@ -18,8 +18,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=WARN -log4j.logger.org.apache.kafka=WARN +log4j.logger.kafka=OFF +log4j.logger.org.apache.kafka=OFF Review Comment: Sorry, will fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16884) Refactor RemoteLogManagerConfig with a Builder
Muralidhar Basani created KAFKA-16884: - Summary: Refactor RemoteLogManagerConfig with a Builder Key: KAFKA-16884 URL: https://issues.apache.org/jira/browse/KAFKA-16884 Project: Kafka Issue Type: Improvement Reporter: Muralidhar Basani Assignee: Muralidhar Basani The {{RemoteLogManagerConfig}} class has a very large constructor with 27 fields, making it difficult to read and work with. Introducing a builder class would be beneficial. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]
cmccabe commented on code in PR #16058: URL: https://github.com/apache/kafka/pull/16058#discussion_r1625003607 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -318,4 +329,28 @@ class KafkaRaftManager[T]( override def voterNode(id: Int, listener: String): Option[Node] = { client.voterNode(id, listener).toScala } + + def verifyCommonFields( +providedClusterId: String, + ): Unit = { +if (!providedClusterId.equals(clusterId)) { + throw new InconsistentClusterIdException("Provided cluster ID " + providedClusterId + Review Comment: > If we throw here, this will get caught here and print an error message, right? https://github.com/apache/kafka/pull/16058/files#diff-91060c918c99d25342f625c146f14425716eda9d8fcfe1126b2c45feff388362R152-R154 Hmm, your URL doesn't work for me. As a general matter, exceptions not otherwise handled in ControllerApis.scala will just invoke `AbstractRequest.getErrorResponse`. > How about for now we just complete the requests with the unsupported version error responses? OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]
C0urante commented on PR #16001: URL: https://github.com/apache/kafka/pull/16001#issuecomment-2146046642 Closing in favor of https://github.com/apache/kafka/pull/16053, which is simpler, more effective, and requires no changes to the config topic or internal REST API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]
C0urante closed pull request #16001: KAFKA-9228: Restart tasks on runtime-only connector config changes URL: https://github.com/apache/kafka/pull/16001 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624997659 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -747,21 +748,19 @@ public void testOffsetCommitSyncTimeoutNotReturnedOnPollAndFails() { new TopicPartition("topic", 1), new OffsetAndMetadata(0)); -// Send sync offset commit request that fails with retriable error. -long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2; -CompletableFuture commitResult = commitRequestManager.commitSync(offsets, expirationTimeMs); -completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT); +// Send sync offset commit request. +long deadlineMs = time.milliseconds() + retryBackoffMs * 2; +CompletableFuture commitResult = commitRequestManager.commitSync(offsets, deadlineMs); -// Request retried after backoff, and fails with retriable again. Should not complete yet +// Make the first request fail with a retriable error. Should not complete yet // given that the request timeout hasn't expired. time.sleep(retryBackoffMs); completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT); assertFalse(commitResult.isDone()); // Sleep to expire the request timeout. Request should fail on the next poll. time.sleep(retryBackoffMs); -NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); -assertEquals(0, res.unsentRequests.size()); +completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT); Review Comment: This code is reverted back to the original form now that we prune expired requests that have been attempted at least once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624996705 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -210,7 +194,7 @@ private NetworkClientDelegate.UnsentRequest createUnsentRequest( private void handleError(final Throwable exception, final long completionTimeMs) { if (exception instanceof RetriableException) { -if (completionTimeMs >= expirationTimeMs) { +if (isExpired()) { Review Comment: I refactored the code so that `handleResponse()` calls `handleError()` if it hits an error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]
cmccabe commented on code in PR #16058: URL: https://github.com/apache/kafka/pull/16058#discussion_r1624993551 ## clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java: ## @@ -1320,6 +1320,16 @@ public ListClientMetricsResourcesResult listClientMetricsResources(ListClientMet return new ListClientMetricsResourcesResult(future); } +@Override +public AddRaftVoterResult addRaftVoter(int voterId, Uuid voterDirectoryId, Set endpoints, AddRaftVoterOptions options) { +throw new UnsupportedOperationException("Not implemented yet"); +} + +@Override +public RemoveRaftVoterResult removeRaftVoter(int voterId, Uuid voterDirectoryId, RemoveRaftVoterOptions options) { +throw new UnsupportedOperationException("Not implemented yet"); +} Review Comment: Maybe not. But Java requires them to be in MockAdminClient so that it will typecheck. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624992826 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -61,30 +62,28 @@ */ public class TopicMetadataRequestManager implements RequestManager { +private final Time time; private final boolean allowAutoTopicCreation; private final List inflightRequests; +private final int requestTimeoutMs; private final long retryBackoffMs; private final long retryBackoffMaxMs; private final Logger log; private final LogContext logContext; -public TopicMetadataRequestManager(final LogContext context, final ConsumerConfig config) { +public TopicMetadataRequestManager(final LogContext context, final Time time, final ConsumerConfig config) { logContext = context; log = logContext.logger(getClass()); +this.time = time; inflightRequests = new LinkedList<>(); +requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); } @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { -// Prune any requests which have timed out -List expiredRequests = inflightRequests.stream() -.filter(req -> req.isExpired(currentTimeMs)) -.collect(Collectors.toList()); -expiredRequests.forEach(TopicMetadataRequestState::expire); - Review Comment: I've updated the logic in `CommitRequestManager` and `TopicMetadataRequestManager` to proactively prune expired `RequestState`s that have had at least one request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]
cmccabe commented on code in PR #16058: URL: https://github.com/apache/kafka/pull/16058#discussion_r1624991685 ## clients/src/main/resources/common/message/AddRaftVoterRequest.json: ## @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 80, + "type": "request", + "listeners": ["controller", "broker"], + "name": "AddRaftVoterRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "ClusterId", "type": "string", "versions": "0+" }, +{ "name": "TimeoutMs", "type": "int32", "versions": "0+" }, +{ "name": "VoterId", "type": "int32", "versions": "0+", + "about": "The replica id of the voter getting added to the topic partition" }, +{ "name": "VoterDirectoryId", "type": "uuid", "versions": "0+", + "about": "The directory id of the voter getting added to the topic partition" }, Review Comment: Right. And it would be easy to update the RPCs to have an optional topic structure, if and when we need one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624957099 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState { */ abstract CompletableFuture future(); -/** - * Complete the request future with a TimeoutException if the request timeout has been - * reached, based on the provided current time. - */ -void maybeExpire(long currentTimeMs) { -if (retryTimeoutExpired(currentTimeMs)) { -removeRequest(); -isExpired = true; -future().completeExceptionally(new TimeoutException(requestDescription() + -" could not complete before timeout expired.")); -} -} - /** * Build request with the given builder, including response handling logic. */ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final AbstractRequest.Builder builder) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator()); +coordinatorRequestManager.coordinator(), +time.timer(requestTimeoutMs) +); request.whenComplete( (response, throwable) -> { long currentTimeMs = request.handler().completionTimeMs(); Review Comment: agree that it's confusing, but for the record, I guess the current in the name may come from the point of view that here's the moment a request completes, and we retrieve the completion time, so could I could see it as the current because of where it's called (but still +1 for better name, simply `completionTimeMs`, that btw aligns with the `handleClientResponse` func param where it's used right below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16530: Fix high-watermark calculation to not assume the leader is in the voter set [kafka]
jsancio commented on code in PR #16079: URL: https://github.com/apache/kafka/pull/16079#discussion_r1624780684 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -341,9 +346,18 @@ public boolean updateReplicaState( state.nodeId, currentEndOffset.offset, fetchOffsetMetadata.offset); } }); - -Optional leaderEndOffsetOpt = -voterStates.get(localId).endOffset; +Optional leaderEndOffsetOpt; +ReplicaState leaderVoterState = voterStates.get(localId); +ReplicaState leaderObserverState = observerStates.get(localId); +if (leaderVoterState != null) { +leaderEndOffsetOpt = leaderVoterState.endOffset; +} else if (leaderObserverState != null) { +// The leader is not guaranteed to be in the voter set when in the process of being removed from the quorum. +log.info("Updating end offset for leader {} which is also an observer.", localId); +leaderEndOffsetOpt = leaderObserverState.endOffset; +} else { +throw new IllegalStateException("Leader state not found for localId " + localId); +} Review Comment: In practice isn't this the same as `getOrCreateReplicaState` so we should just use that function here. ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -341,9 +346,18 @@ public boolean updateReplicaState( state.nodeId, currentEndOffset.offset, fetchOffsetMetadata.offset); } }); - -Optional leaderEndOffsetOpt = -voterStates.get(localId).endOffset; +Optional leaderEndOffsetOpt; Review Comment: If you move this right before the `if` statement and mark it as final it makes it more obvious that this variable can take two different values. ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -445,6 +463,27 @@ private boolean isVoter(int remoteNodeId) { return voterStates.containsKey(remoteNodeId); } +// with Jose's changes this will probably make more sense as VoterSet +private void updateVoterSet(Set lastVoterSet) { +// Remove any voter state that is not in the last voter set. They become observers. +for (Iterator> iter = voterStates.entrySet().iterator(); iter.hasNext(); ) { +Integer nodeId = iter.next().getKey(); +if (!lastVoterSet.contains(nodeId)) { +createObserverState(nodeId); +iter.remove(); +} +} + +// Add any voter state that is in the last voter set but not in the current voter set. They are removed from +// the observerStates if they exist. +for (int voterId : lastVoterSet) { +if (!voterStates.containsKey(voterId)) { +voterStates.put(voterId, new ReplicaState(voterId, false)); +observerStates.remove(voterId); Review Comment: This comment applies to this code block and the one above. When moving a replica state from voter to observer and vice versa, we shouldn't create a new replica state but instead reuse the replica state for the previous hash map. ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -445,6 +463,27 @@ private boolean isVoter(int remoteNodeId) { return voterStates.containsKey(remoteNodeId); } +// with Jose's changes this will probably make more sense as VoterSet +private void updateVoterSet(Set lastVoterSet) { +// Remove any voter state that is not in the last voter set. They become observers. +for (Iterator> iter = voterStates.entrySet().iterator(); iter.hasNext(); ) { +Integer nodeId = iter.next().getKey(); +if (!lastVoterSet.contains(nodeId)) { +createObserverState(nodeId); Review Comment: The code in `clearInactiveObservers` removes any `ReplicaState` in `observerStates` that hasn't been updated after some time. We should make sure that the local replica (the leader) is not remove from `observerStates` if it exist. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]
jsancio commented on code in PR #16058: URL: https://github.com/apache/kafka/pull/16058#discussion_r1624911276 ## clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java: ## @@ -1320,6 +1320,16 @@ public ListClientMetricsResourcesResult listClientMetricsResources(ListClientMet return new ListClientMetricsResourcesResult(future); } +@Override +public AddRaftVoterResult addRaftVoter(int voterId, Uuid voterDirectoryId, Set endpoints, AddRaftVoterOptions options) { +throw new UnsupportedOperationException("Not implemented yet"); +} + +@Override +public RemoveRaftVoterResult removeRaftVoter(int voterId, Uuid voterDirectoryId, RemoveRaftVoterOptions options) { +throw new UnsupportedOperationException("Not implemented yet"); +} Review Comment: I suspect that we will never implement these. Do you agree? ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -1080,4 +1083,39 @@ class ControllerApis( requestThrottleMs => new AssignReplicasToDirsResponse(reply.setThrottleTimeMs(requestThrottleMs))) } } + + def handleAddRaftVoter(request: RequestChannel.Request): CompletableFuture[Unit] = { +authHelper.authorizeClusterOperation(request, ALTER) +val addRequest = request.body[AddRaftVoterRequest] +raftManager.handleAddVoter(addRequest.data()) +requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + new AddRaftVoterResponse(new AddRaftVoterResponseData(). +setThrottleTimeMs(requestThrottleMs). +setErrorCode(0.toShort). +setErrorMessage(null))) +CompletableFuture.completedFuture[Unit](()) Review Comment: I think this is going to change in the real implementation. For KRaft requests we simply forward them to the raft thread/actor using `handleRaftRequest` and `raftManager.handleRequest`. How about for now we just send an error response with "unsupported version" and return a completed future? This comment applies to all of the new methods added to this type. ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -318,4 +329,28 @@ class KafkaRaftManager[T]( override def voterNode(id: Int, listener: String): Option[Node] = { client.voterNode(id, listener).toScala } + + def verifyCommonFields( +providedClusterId: String, + ): Unit = { +if (!providedClusterId.equals(clusterId)) { + throw new InconsistentClusterIdException("Provided cluster ID " + providedClusterId + Review Comment: This applies to all of the `throw new` added. If we throw here, this will get caught here and print an error message, right? https://github.com/apache/kafka/pull/16058/files#diff-91060c918c99d25342f625c146f14425716eda9d8fcfe1126b2c45feff388362R152-R154 How about for now we just complete the requests with the unsupported version error responses? Also, I have to double check but I think cluster ids are check by the `KafkaRaftClient` and not the `KafkaRaftManager`. We have this abstraction so we can test that behavior in the `KafkaRaftClientTest` suite or if we add a new test suite for this functionality. Most our interesting tests are against `KafkaRaftClient`. The tests for `KafkaRaftManager` are more basic and limited. ## clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java: ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Locale; +import java.util.Objects; + +/** + * An endpoint for a raft quorum voter. + */ +@InterfaceStability.Stable +public class RaftVoterEndpoint { +private final String name; +private final String host; +private final int port; +private final String securityProtocol; Review Comment: Did you consider using `o.a.k.c.Endpoints`? This struct has a similar structure to `Endpoints`. During the KIP discussion we agreed to not add security protocol to the `AddRaftVoter` request. Mainly because it is not needed and that there is a requirement that
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624788878 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -235,7 +235,6 @@ public void addAll(final List requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); -r.setTimer(this.time, this.requestTimeoutMs); Review Comment: It _shouldn't_. My preference is to make data as immutable as possible. Setting the `Timer` on creation vs. after the fact seems cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Use project path instead of name for spotlessApplyModules in Gradle script [kafka]
C0urante merged PR #16177: URL: https://github.com/apache/kafka/pull/16177 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: update / add documentations stream-state-metrics [kafka]
sebastienviale opened a new pull request, #16182: URL: https://github.com/apache/kafka/pull/16182 This change update the documentation for the all-latency-max, range-latency-avg, range-latency-max and add the documentation for prefix-scan. It specifies that the execution time of the all operation is in fact the time the iterator is in use. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16820) Kafka Broker fails to connect to Kraft Controller with no DNS matching
[ https://issues.apache.org/jira/browse/KAFKA-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851781#comment-17851781 ] Arushi Helms commented on KAFKA-16820: -- Hi [~soarez] Do you have any update on this? > Kafka Broker fails to connect to Kraft Controller with no DNS matching > --- > > Key: KAFKA-16820 > URL: https://issues.apache.org/jira/browse/KAFKA-16820 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.7.0, 3.6.1, 3.8.0 >Reporter: Arushi Helms >Priority: Major > Attachments: Screenshot 2024-05-22 at 1.09.11 PM-1.png > > > > We are migrating our Kafka cluster from zookeeper to Kraft mode. We are > running individual brokers and controllers with TLS enabled and IPs are given > for communication. > TLS enabled setup works fine among the brokers and the certificate looks > something like: > {noformat} > Common Name: *.kafka.service.consul > Subject Alternative Names: *.kafka.service.consul, IP > Address:10.87.170.78{noformat} > Note: > * The DNS name for the node does not match the CN but since we are using IPs > as communication, we have provided IPs as SAN. > * Same with the controllers, IPs are given as SAN in the certificate. > * Issue is not related to the migration so just sharing configuration > relevant for the TLS piece. > In the current setup I am running 3 brokers and 3 controllers. > *CONTROLLER:* > Relevant controller configurations from one of the controllers: > {noformat} > KAFKA_CFG_PROCESS_ROLES=controller > KAFKA_KRAFT_CLUSTER_ID=5kztjhJ4SxSu-kdiEYDUow > KAFKA_CFG_NODE_ID=6 > KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097 > > KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER > KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL > KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SSL,INSIDE_SSL:SSL > KAFKA_CFG_LISTENERS=CONTROLLER://10.87.170.6:9097{noformat} > Controller certificate has: > {noformat} > Common Name: *.kafka.service.consul > Subject Alternative Names: *.kafka.service.consul, IP > Address:10.87.170.6{noformat} > > *BROKER:* > Relevant broker configuration from one of the brokers: > {noformat} > KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER > KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL > KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097 > > KAFKA_CFG_PROCESS_ROLES=broker > KAFKA_CFG_NODE_ID=3 > KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE_SSL:SSL,OUTSIDE_SSL:SSL,CONTROLLER:SSL > > KAFKA_CFG_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096 > > KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096{noformat} > Broker certificate has: > {noformat} > Common Name: *.kafka.service.consul > Subject Alternative Names: *.kafka.service.consul, IP > Address:10.87.170.78{noformat} > > ISSUE 1: > With this setup Kafka broker is failing to connect to the controller, see the > following error: > {noformat} > 2024-05-22 17:53:46,413] ERROR > [broker-2-to-controller-heartbeat-channel-manager]: Request > BrokerRegistrationRequestData(brokerId=2, clusterId='5kztjhJ4SxSu-kdiEYDUow', > incarnationId=7741fgH6T4SQqGsho8E6mw, listeners=[Listener(name='INSIDE_SSL', > host='10.87.170.81', port=9093, securityProtocol=1), Listener(name='INSIDE', > host='10.87.170.81', port=9094, securityProtocol=0), Listener(name='OUTSIDE', > host='10.87.170.81', port=9092, securityProtocol=0), > Listener(name='OUTSIDE_SSL', host='10.87.170.81', port=9096, > securityProtocol=1)], features=[Feature(name='metadata.version', > minSupportedVersion=1, maxSupportedVersion=19)], rack=null, > isMigratingZkBroker=false, logDirs=[TJssfKDD-iBFYfIYCKOcew], > previousBrokerEpoch=-1) failed due to authentication error with controller > (kafka.server.NodeToControllerRequestThread)org.apache.kafka.common.errors.SslAuthenticationException: > SSL handshake failedCaused by: javax.net.ssl.SSLHandshakeException: No > subject alternative DNS name matching > cp-internal-onecloud-kfkc1.node.cp-internal-onecloud.consul found.at > java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131) at > java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:378) > at > java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321) > at > java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:316) > at > java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1351) > at > java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1226) > at >
Re: [PR] KAFKA-16861: Don't convert to group to classic if the size is larger than group max size. [kafka]
dajac commented on PR #16163: URL: https://github.com/apache/kafka/pull/16163#issuecomment-2145877889 Merged to trunk and to 3.8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Small refactor in TargetAssignmentBuilder [kafka]
dajac commented on PR #16174: URL: https://github.com/apache/kafka/pull/16174#issuecomment-2145877229 Merged to trunk and to 3.8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org