[PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]
kamalcph opened a new pull request, #15793: URL: https://github.com/apache/kafka/pull/15793 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]
kamalcph commented on PR #15787: URL: https://github.com/apache/kafka/pull/15787#issuecomment-2074099558 Opened #15793 to port it to v3.7 branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]
chia7712 commented on PR #15782: URL: https://github.com/apache/kafka/pull/15782#issuecomment-2074037067 @TaiJuWu Could you please rebase code to trigger QA again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]
chia7712 merged PR #15774: URL: https://github.com/apache/kafka/pull/15774 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]
chia7712 commented on PR #15774: URL: https://github.com/apache/kafka/pull/15774#issuecomment-2074035809 ``` ./gradlew cleanTest :streams:test --tests EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore :tools:test --tests MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful :storage:test --tests TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout :connect:runtime:test --tests org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsets --tests org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks --tests org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId :metadata:test --tests QuorumControllerTest.testUnregisterBroker --tests QuorumControllerTest.testConfigurationOperations :trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test --tests IdentityReplicationIntegrationTest.testSyncTopicConfigs --tests MirrorConnectorsIntegrationTransactio nsTest.testSyncTopicConfigs :core:test --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithoutDescribeAclViaSubscribe --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaAssign --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithoutDescribeAclViaAssign --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoGroupAcl --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests ZkMigrationIntegrationTest.testDualWrite --tests DynamicBrokerReconfigurationTest.testTrustStoreAlter --tests ReplicationQuotasTest.shouldThrottleOldSegments ``` Don't observe related failure, and they pass on my local -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
chia7712 merged PR #15714: URL: https://github.com/apache/kafka/pull/15714 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1577225484 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { +// We will create a new WorkerCoordinator object with a rebalance timeout smaller +// than session timeout. This might not happen in the real world but it makes testing +// easier and the test not flaky. +int smallRebalanceTimeout = 20; +this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, +smallRebalanceTimeout, +heartbeatIntervalMs, +groupId, +Optional.empty(), +retryBackoffMs, +retryBackoffMaxMs, +true); +this.coordinator = new WorkerCoordinator(rebalanceConfig, +logContext, +consumerClient, +new Metrics(time), +"consumer" + groupId, +time, +LEADER_URL, +configStorage, +rebalanceListener, +compatibility, +0); + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { +coordinator.ensureActiveGroup(); +coordinator.poll(0, () -> null); + +// The heartbeat thread is running and keeps sending heartbeat requests. +TestUtils.waitForCondition(() -> { +// Rebalance timeout elapses while poll is never invoked causing a poll timeout expiry +coordinator.sendHeartbeatRequest(); +client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); +time.sleep(1); Review Comment: I can try again to validate it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1577223693 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { +// We will create a new WorkerCoordinator object with a rebalance timeout smaller +// than session timeout. This might not happen in the real world but it makes testing +// easier and the test not flaky. +int smallRebalanceTimeout = 20; +this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, +smallRebalanceTimeout, +heartbeatIntervalMs, +groupId, +Optional.empty(), +retryBackoffMs, +retryBackoffMaxMs, +true); +this.coordinator = new WorkerCoordinator(rebalanceConfig, +logContext, +consumerClient, +new Metrics(time), +"consumer" + groupId, +time, +LEADER_URL, +configStorage, +rebalanceListener, +compatibility, +0); + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { +coordinator.ensureActiveGroup(); +coordinator.poll(0, () -> null); + +// The heartbeat thread is running and keeps sending heartbeat requests. +TestUtils.waitForCondition(() -> { +// Rebalance timeout elapses while poll is never invoked causing a poll timeout expiry +coordinator.sendHeartbeatRequest(); +client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); +time.sleep(1); Review Comment: I tried that, it still threw session timeout once in a while when I ran it for 30 times . So to be in the safer side, I added it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]
FrankYang0529 commented on PR #15773: URL: https://github.com/apache/kafka/pull/15773#issuecomment-2073981282 @showuon thanks for the review and clarify scheduler behavior. Learn a lot -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
showuon commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1577220797 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java: ## @@ -267,6 +267,18 @@ public String memberId() { return JoinGroupRequest.UNKNOWN_MEMBER_ID; } +@Override +protected void handlePollTimeoutExpiry() { +log.warn("worker poll timeout has expired. This means the time between subsequent calls to poll() " + +"in DistributedHerder tick() method was longer than the configured rebalance.timeout.ms. " + +"If you see this happening consistently, then it can be addressed by either adding more workers " + +"to the connect cluster or by increasing the rebalance.timeout.ms configuration value. Please note that " + +"rebalance.timeout.ms also controls the maximum allowed time for each worker to join the group once a " + +"rebalance has begun so the set value should not be very high"); Review Comment: @gharris1727 @mimaison , could you help review these log output since you're the connect experts. Thanks. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java: ## @@ -267,6 +267,18 @@ public String memberId() { return JoinGroupRequest.UNKNOWN_MEMBER_ID; } +@Override +protected void handlePollTimeoutExpiry() { +log.warn("worker poll timeout has expired. This means the time between subsequent calls to poll() " + +"in DistributedHerder tick() method was longer than the configured rebalance.timeout.ms. " + +"If you see this happening consistently, then it can be addressed by either adding more workers " + +"to the connect cluster or by increasing the rebalance.timeout.ms configuration value. Please note that " + +"rebalance.timeout.ms also controls the maximum allowed time for each worker to join the group once a " + +"rebalance has begun so the set value should not be very high"); Review Comment: @gharris1727 @mimaison , could you help review these log output since you're the connect experts? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
showuon commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1577219150 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { +// We will create a new WorkerCoordinator object with a rebalance timeout smaller +// than session timeout. This might not happen in the real world but it makes testing +// easier and the test not flaky. +int smallRebalanceTimeout = 20; +this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, +smallRebalanceTimeout, +heartbeatIntervalMs, +groupId, +Optional.empty(), +retryBackoffMs, +retryBackoffMaxMs, +true); +this.coordinator = new WorkerCoordinator(rebalanceConfig, +logContext, +consumerClient, +new Metrics(time), +"consumer" + groupId, +time, +LEADER_URL, +configStorage, +rebalanceListener, +compatibility, +0); + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { +coordinator.ensureActiveGroup(); +coordinator.poll(0, () -> null); + +// The heartbeat thread is running and keeps sending heartbeat requests. +TestUtils.waitForCondition(() -> { +// Rebalance timeout elapses while poll is never invoked causing a poll timeout expiry +coordinator.sendHeartbeatRequest(); +client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); +time.sleep(1); Review Comment: If you set `RebalanceTimeout < sessionTimeout`, why do we need the heartBeatRequest? You should be able to sleep (RebalanceTimeout + 1) directly to get the log. Ex: ``` try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { coordinator.ensureActiveGroup(); coordinator.poll(0, () -> null); time.sleep(smallRebalanceTimeout + 1); // The heartbeat thread is running and keeps sending heartbeat requests. TestUtils.waitForCondition(() -> logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN")) && logCaptureAppender.getEvents().stream().anyMatch(e -> e.getMessage().startsWith("worker poll timeout has expired"), ...) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]
showuon commented on PR #15773: URL: https://github.com/apache/kafka/pull/15773#issuecomment-2073971783 Re-trigger CI build: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15773/5/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]
showuon commented on PR #15773: URL: https://github.com/apache/kafka/pull/15773#issuecomment-2073971106 > If we want to use taskRunning to check whether deletionTask is executed, we may need a new value in LogManager class to keep scheduler future object. WDYT? > I think that makes sense. Or alternatively, we can add another test to test the deleteLogs method that it should not be blocked if fileDeleteDelayMs=0. Sorry, I was wrong. I was checking the `ScheduledThreadPoolExecutor#shutdown`, it'll wait until all the tasks in queue to be completed. So as long as we are sure the task is being in queue, I think we don't need further verification anymore. That said, no other modification is needed. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]
showuon commented on PR #15773: URL: https://github.com/apache/kafka/pull/15773#issuecomment-2073966870 > I think deletionTask must be executed. From LogManager#startup to LogManager#startupWithConfigOverrides is not asynchronous code, so deletionTask must be put into scheduler queue. During scheduler shutdown, new tasks can not join, but previous task will be executed. Yes, you're right. It'll be enqueued anyway. > If we want to use taskRunning to check whether deletionTask is executed, we may need a new value in LogManager class to keep scheduler future object. WDYT? I think that makes sense. Or alternatively, we can add another test to test the `deleteLogs` method that it should not be blocked if `fileDeleteDelayMs=0`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Store separate output per test case instead of suite [kafka]
github-actions[bot] commented on PR #15258: URL: https://github.com/apache/kafka/pull/15258#issuecomment-2073953728 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]
FrankYang0529 commented on code in PR #15773: URL: https://github.com/apache/kafka/pull/15773#discussion_r1577187165 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -1343,6 +1346,45 @@ class LogManagerTest { assertFalse(f.exists()) } } + + /** + * Test KafkaScheduler can be shutdown when file delete delay is set to 0. + */ + @Test + def testShutdownWithZeroFileDeleteDelayMs(): Unit = { +val tmpLogDir = TestUtils.tempDir() +val tmpProperties = new Properties() +tmpProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "0") +val scheduler = new KafkaScheduler(1, true, "log-manager-test") +val tmpLogManager = new LogManager(logDirs = Seq(tmpLogDir).map(_.getAbsoluteFile), + initialOfflineDirs = Array.empty[File], + configRepository = new MockConfigRepository, + initialDefaultConfig = new LogConfig(tmpProperties), + cleanerConfig = new CleanerConfig(false), + recoveryThreadsPerDataDir = 1, + flushCheckMs = 1000L, + flushRecoveryOffsetCheckpointMs = 1L, + flushStartOffsetCheckpointMs = 1L, + retentionCheckMs = 1000L, + maxTransactionTimeoutMs = 5 * 60 * 1000, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + scheduler = scheduler, + time = Time.SYSTEM, + brokerTopicStats = new BrokerTopicStats, + logDirFailureChannel = new LogDirFailureChannel(1), + keepPartitionMetadataFile = true, + interBrokerProtocolVersion = MetadataVersion.latestTesting, + remoteStorageSystemEnable = false, + initialTaskDelayMs = 0) + +scheduler.startup() +tmpLogManager.startup(Set.empty) +val stopLogManager: Executable = () => tmpLogManager.shutdown() +val stopScheduler: Executable = () => scheduler.shutdown() +assertTimeoutPreemptively(Duration.ofMillis(5000), stopLogManager) +assertTimeoutPreemptively(Duration.ofMillis(5000), stopScheduler) Review Comment: I think `deletionTask` must be executed. From `LogManager#startup` to `LogManager#startupWithConfigOverrides` is not asynchronous code, so `deletionTask` must be put into scheduler queue. During scheduler shutdown, new tasks can not join, but previous task will be executed. If we want to use `taskRunning` to check whether deletionTask is executed, we may need a new value in LogManager class to keep scheduler future object. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]
showuon commented on PR #15787: URL: https://github.com/apache/kafka/pull/15787#issuecomment-2073928975 Re-triggering CI build: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15787/3/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
showuon merged PR #15748: URL: https://github.com/apache/kafka/pull/15748 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
showuon commented on PR #15748: URL: https://github.com/apache/kafka/pull/15748#issuecomment-2073928078 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2073911312 Retriggering CI build : https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15616/11/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]
showuon commented on code in PR #15773: URL: https://github.com/apache/kafka/pull/15773#discussion_r1577162256 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -1343,6 +1346,45 @@ class LogManagerTest { assertFalse(f.exists()) } } + + /** + * Test KafkaScheduler can be shutdown when file delete delay is set to 0. + */ + @Test + def testShutdownWithZeroFileDeleteDelayMs(): Unit = { +val tmpLogDir = TestUtils.tempDir() +val tmpProperties = new Properties() +tmpProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "0") +val scheduler = new KafkaScheduler(1, true, "log-manager-test") +val tmpLogManager = new LogManager(logDirs = Seq(tmpLogDir).map(_.getAbsoluteFile), + initialOfflineDirs = Array.empty[File], + configRepository = new MockConfigRepository, + initialDefaultConfig = new LogConfig(tmpProperties), + cleanerConfig = new CleanerConfig(false), + recoveryThreadsPerDataDir = 1, + flushCheckMs = 1000L, + flushRecoveryOffsetCheckpointMs = 1L, + flushStartOffsetCheckpointMs = 1L, + retentionCheckMs = 1000L, + maxTransactionTimeoutMs = 5 * 60 * 1000, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + scheduler = scheduler, + time = Time.SYSTEM, + brokerTopicStats = new BrokerTopicStats, + logDirFailureChannel = new LogDirFailureChannel(1), + keepPartitionMetadataFile = true, + interBrokerProtocolVersion = MetadataVersion.latestTesting, + remoteStorageSystemEnable = false, + initialTaskDelayMs = 0) + +scheduler.startup() +tmpLogManager.startup(Set.empty) +val stopLogManager: Executable = () => tmpLogManager.shutdown() +val stopScheduler: Executable = () => scheduler.shutdown() +assertTimeoutPreemptively(Duration.ofMillis(5000), stopLogManager) +assertTimeoutPreemptively(Duration.ofMillis(5000), stopScheduler) Review Comment: How could we make sure the `kafka-delete-logs` task is already running before we run shutdown? Even though we schedule it with 0 delay, it is still not guaranteed the task (thread) will be created immediately, right? I saw there's `scheduler#taskRunning` that can be used to verify that. Could we add that? Ex: ``` scheduler.startup() tmpLogManager.startup(Set.empty) TestUtils.waitUntilTrue( () => scheduler.taskRunning(deletionTask), ...) ``` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]
showuon commented on code in PR #15773: URL: https://github.com/apache/kafka/pull/15773#discussion_r1577162256 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -1343,6 +1346,45 @@ class LogManagerTest { assertFalse(f.exists()) } } + + /** + * Test KafkaScheduler can be shutdown when file delete delay is set to 0. + */ + @Test + def testShutdownWithZeroFileDeleteDelayMs(): Unit = { +val tmpLogDir = TestUtils.tempDir() +val tmpProperties = new Properties() +tmpProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "0") +val scheduler = new KafkaScheduler(1, true, "log-manager-test") +val tmpLogManager = new LogManager(logDirs = Seq(tmpLogDir).map(_.getAbsoluteFile), + initialOfflineDirs = Array.empty[File], + configRepository = new MockConfigRepository, + initialDefaultConfig = new LogConfig(tmpProperties), + cleanerConfig = new CleanerConfig(false), + recoveryThreadsPerDataDir = 1, + flushCheckMs = 1000L, + flushRecoveryOffsetCheckpointMs = 1L, + flushStartOffsetCheckpointMs = 1L, + retentionCheckMs = 1000L, + maxTransactionTimeoutMs = 5 * 60 * 1000, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + scheduler = scheduler, + time = Time.SYSTEM, + brokerTopicStats = new BrokerTopicStats, + logDirFailureChannel = new LogDirFailureChannel(1), + keepPartitionMetadataFile = true, + interBrokerProtocolVersion = MetadataVersion.latestTesting, + remoteStorageSystemEnable = false, + initialTaskDelayMs = 0) + +scheduler.startup() +tmpLogManager.startup(Set.empty) +val stopLogManager: Executable = () => tmpLogManager.shutdown() +val stopScheduler: Executable = () => scheduler.shutdown() +assertTimeoutPreemptively(Duration.ofMillis(5000), stopLogManager) +assertTimeoutPreemptively(Duration.ofMillis(5000), stopScheduler) Review Comment: How could we make sure the `kafka-delete-logs` task is already running before we run shutdown? I saw there's `scheduler#taskRunning` that can be used to verify that. Could we add that? Ex: ``` scheduler.startup() tmpLogManager.startup(Set.empty) TestUtils.waitUntilTrue( () => scheduler.taskRunning(deletionTask), ...) ``` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1577073007 ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -198,7 +177,7 @@ public BootstrapMetadata bootstrapMetadata() { return bootstrapMetadata; } -public NavigableMap brokerNodes() { +public Map brokerNodes() { Review Comment: I found all tests failed... Sorry for wasting your time, I'll be more careful :disappointed: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1577056133 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -42,109 +58,141 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { -String[] getArgs(String group, String topic) { -return new String[] { -"--bootstrap-server", bootstrapServers(listenerName()), -"--delete-offsets", -"--group", group, -"--topic", topic -}; +@Tag("integration") +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true") +}) +@ExtendWith(ClusterTestExtensions.class) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { +public static final String TOPIC = "foo"; +public static final String GROUP = "test.group"; +private final ClusterInstance clusterInstance; + +private ConsumerGroupCommand.ConsumerGroupService consumerGroupService; +private final Iterable> consumerConfigs; + +DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +this.consumerConfigs = clusterInstance.isKRaftTest() +? Arrays.asList( +new HashMap() {{ Review Comment: Updated it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16577: New consumer fails with stop within allotted timeout in consumer_test.py system test [kafka]
kirktrue commented on PR #15784: URL: https://github.com/apache/kafka/pull/15784#issuecomment-2073707108 @lucasbru/@cadonna —please review. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]
vamossagar12 commented on PR #15762: URL: https://github.com/apache/kafka/pull/15762#issuecomment-2073698582 The build got aborted again 樂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16608) AsyncKafkaConsumer doesn't honor interrupted thread status on KafkaConsumer.poll(Duration)
[ https://issues.apache.org/jira/browse/KAFKA-16608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16608: -- Summary: AsyncKafkaConsumer doesn't honor interrupted thread status on KafkaConsumer.poll(Duration) (was: AsyncKafkaConsumer doesn't honour interrupted thread status on KafkaConsumer.poll(Duration)) > AsyncKafkaConsumer doesn't honor interrupted thread status on > KafkaConsumer.poll(Duration) > -- > > Key: KAFKA-16608 > URL: https://issues.apache.org/jira/browse/KAFKA-16608 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Minor > > The behaviour for KafkaConsumer.poll(Duration) when the calling thread is in > interrupted state is to throw InterruptException. The AsyncKafkaConsumer > doesn't do this. It only throws that exception if the interruption occurs > while it is waiting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]
appchemist commented on PR #15647: URL: https://github.com/apache/kafka/pull/15647#issuecomment-2073683927 @kirktrue Thanks for the heads-up! If you have a moment, please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2073680590 @lianetm @chia7712 Thanks for your comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] enable test, check if continues failing in CI [kafka]
showuon commented on PR #13953: URL: https://github.com/apache/kafka/pull/13953#issuecomment-2073673074 @Kiriakos1998 , could you rebase with the latest trunk branch, and resolve the conflicts? I'd like to see if it still passes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16609) Update parse_describe_topic to support new topic describe output
Kirk True created KAFKA-16609: - Summary: Update parse_describe_topic to support new topic describe output Key: KAFKA-16609 URL: https://issues.apache.org/jira/browse/KAFKA-16609 Project: Kafka Issue Type: Bug Components: admin, system tests Affects Versions: 3.8.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 It appears that recent changes to the describe topic output has broken the system test's ability to parse the output. {noformat} test_id: kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=False.reassign_from_offset_zero=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 50.333 seconds IndexError('list index out of range') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py", line 175, in test_reassign_partitions self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers)) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/produce_consume_validate.py", line 105, in run_produce_consume_validate core_test_action(*args) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py", line 175, in self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers)) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py", line 82, in reassign_partitions partition_info = self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic)) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/kafka/kafka.py", line 1400, in parse_describe_topic fields = list(map(lambda x: x.split(" ")[1], fields)) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/kafka/kafka.py", line 1400, in fields = list(map(lambda x: x.split(" ")[1], fields)) IndexError: list index out of range {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]
kirktrue commented on PR #15647: URL: https://github.com/apache/kafka/pull/15647#issuecomment-2073652156 @appchemist—thanks for the PR, and sorry for the delay in response! I've taken a first pass but am still working through the unit test changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16217: Stop the abort transaction try loop when closing producers (#15541) [kafka]
CalvinConfluent opened a new pull request, #15792: URL: https://github.com/apache/kafka/pull/15792 This is a mitigation fix for the https://issues.apache.org/jira/browse/KAFKA-16217. Exceptions should not block closing the producers. This PR reverts a part of the change #13591 Reviewers: Kirk True , Justine Olshan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16217: Stop the abort transaction try loop when closing producers (#15541) [kafka]
CalvinConfluent opened a new pull request, #15791: URL: https://github.com/apache/kafka/pull/15791 This is a mitigation fix for the https://issues.apache.org/jira/browse/KAFKA-16217. Exceptions should not block closing the producers. This PR reverts a part of the change #13591 Reviewers: Kirk True , Justine Olshan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add Integration tests for the the KIP-966 [kafka]
CalvinConfluent commented on PR #15759: URL: https://github.com/apache/kafka/pull/15759#issuecomment-2073486845 @mumrah @artemlivshits Can you help take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug
[ https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840232#comment-17840232 ] Matthias J. Sax commented on KAFKA-16584: - I would prefer to make if configurable personally. Should be a more or less simple change, but requires a KIP. > Make log processing summary configurable or debug > - > > Key: KAFKA-16584 > URL: https://issues.apache.org/jira/browse/KAFKA-16584 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Andras Hatvani >Priority: Major > Labels: needs-kip, newbie > > Currently *every two minutes for every stream thread* statistics will be > logged on INFO log level. > {code} > 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 > total records, ran 0 punctuators, and committed 0 total tasks since the last > update {code} > This is absolutely unnecessary and even harmful since it fills the logs and > thus storage space with unwanted and useless data. Otherwise the INFO logs > are useful and helpful, therefore it's not an option to raise the log level > to WARN. > Please make the logProcessingSummary > * either to a DEBUG level log or > * make it configurable so that it can be disabled. > This is the relevant code: > https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16584) Make log processing summary configurable or debug
[ https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16584: Labels: needs-kip newbie (was: ) > Make log processing summary configurable or debug > - > > Key: KAFKA-16584 > URL: https://issues.apache.org/jira/browse/KAFKA-16584 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Andras Hatvani >Priority: Major > Labels: needs-kip, newbie > > Currently *every two minutes for every stream thread* statistics will be > logged on INFO log level. > {code} > 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 > total records, ran 0 punctuators, and committed 0 total tasks since the last > update {code} > This is absolutely unnecessary and even harmful since it fills the logs and > thus storage space with unwanted and useless data. Otherwise the INFO logs > are useful and helpful, therefore it's not an option to raise the log level > to WARN. > Please make the logProcessingSummary > * either to a DEBUG level log or > * make it configurable so that it can be disabled. > This is the relevant code: > https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16608) AsyncKafkaConsumer doesn't honour interrupted thread status on KafkaConsumer.poll(Duration)
[ https://issues.apache.org/jira/browse/KAFKA-16608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield reassigned KAFKA-16608: Assignee: Andrew Schofield > AsyncKafkaConsumer doesn't honour interrupted thread status on > KafkaConsumer.poll(Duration) > --- > > Key: KAFKA-16608 > URL: https://issues.apache.org/jira/browse/KAFKA-16608 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Minor > > The behaviour for KafkaConsumer.poll(Duration) when the calling thread is in > interrupted state is to throw InterruptException. The AsyncKafkaConsumer > doesn't do this. It only throws that exception if the interruption occurs > while it is waiting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16608) AsyncKafkaConsumer doesn't honour interrupted thread status on KafkaConsumer.poll(Duration)
[ https://issues.apache.org/jira/browse/KAFKA-16608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16608: -- Component/s: consumer > AsyncKafkaConsumer doesn't honour interrupted thread status on > KafkaConsumer.poll(Duration) > --- > > Key: KAFKA-16608 > URL: https://issues.apache.org/jira/browse/KAFKA-16608 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Andrew Schofield >Priority: Minor > > The behaviour for KafkaConsumer.poll(Duration) when the calling thread is in > interrupted state is to throw InterruptException. The AsyncKafkaConsumer > doesn't do this. It only throws that exception if the interruption occurs > while it is waiting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576836597 ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -198,7 +177,7 @@ public BootstrapMetadata bootstrapMetadata() { return bootstrapMetadata; } -public NavigableMap brokerNodes() { +public Map brokerNodes() { Review Comment: please ignore this comment since we create all controllers before brokers, and hence the order of brokers is not matter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16608) AsyncKafkaConsumer doesn't honour interrupted thread status on KafkaConsumer.poll(Duration)
Andrew Schofield created KAFKA-16608: Summary: AsyncKafkaConsumer doesn't honour interrupted thread status on KafkaConsumer.poll(Duration) Key: KAFKA-16608 URL: https://issues.apache.org/jira/browse/KAFKA-16608 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.8.0 Reporter: Andrew Schofield The behaviour for KafkaConsumer.poll(Duration) when the calling thread is in interrupted state is to throw InterruptException. The AsyncKafkaConsumer doesn't do this. It only throws that exception if the interruption occurs while it is waiting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14509: [3/4] Add integration test for consumerGroupDescribe Api [kafka]
riedelmax commented on code in PR #15727: URL: https://github.com/apache/kafka/pull/15727#discussion_r1576821065 ## core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala: ## @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server + +import kafka.server.GroupCoordinatorBaseRequestTest +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.utils.TestUtils +import org.apache.kafka.common.ConsumerGroupState +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, DescribedGroup, TopicPartitions} +import org.apache.kafka.common.protocol.ApiKeys +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.{Tag, Timeout} + +import scala.jdk.CollectionConverters._ + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) Review Comment: Hey @johnnychhsu Thanks for reviewing :) AFAIK the new consumer group only works with KRAFT clusters. @dajac can you confirm this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]
chia7712 commented on PR #15782: URL: https://github.com/apache/kafka/pull/15782#issuecomment-2073234352 re-trigger QA since some builds get terminated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576751637 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.consumer.group; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +public class ConsumerGroupExecutor implements AutoCloseable { +final int numThreads; +final ExecutorService executor; +final List consumers = new ArrayList<>(); + +public ConsumerGroupExecutor( +String broker, +int numConsumers, +String groupId, +String groupProtocol, +String topic, +String strategy, +Optional remoteAssignor, +Optional customPropsOpt, +boolean syncCommit Review Comment: The `syncCommit` param seems to always be false. If that's the case what's the purpose of it? (is it with the intention of using this same common `ConsumerGroupExecutor` class from the `ConsumerGroupCommandTest` too?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576738178 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.consumer.group; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +public class ConsumerGroupExecutor implements AutoCloseable { +final int numThreads; +final ExecutorService executor; +final List consumers = new ArrayList<>(); + +public ConsumerGroupExecutor( +String broker, +int numConsumers, +String groupId, +String groupProtocol, +String topic, +String strategy, Review Comment: maybe rename this to `assignmentStrategy` for clarity on what it is -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576735920 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.consumer.group; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +public class ConsumerGroupExecutor implements AutoCloseable { +final int numThreads; +final ExecutorService executor; +final List consumers = new ArrayList<>(); + +public ConsumerGroupExecutor( +String broker, +int numConsumers, +String groupId, +String groupProtocol, +String topic, +String strategy, +Optional remoteAssignor, +Optional customPropsOpt, +boolean syncCommit Review Comment: We have 2 well defined group types, classic and consumer, with different creation params, and having them all together seems a bit confusing (ex. having assignment strategy along with remote assignor)...not very intuitive when creating a new instance (that I expect several other test files will do as they get migrated). So wonder if it would maybe be clearer to split them into static builders that would take only the params they need? like: ``` public static ConsumerGroupExecutor forConsumerGroup( String broker, int numConsumers, String groupId, String topic, Optional remoteAssignor, Optional customPropsOpt, boolean syncCommit) { } public static ConsumerGroupExecutor forClassicGroup( String broker, int numConsumers, String groupId, String topic, String strategy, Optional customPropsOpt, boolean syncCommit) { } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close
[ https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840191#comment-17840191 ] Kirk True commented on KAFKA-16217: --- [~calvinliu]—I noticed the PR request 15541 is merged, but the status of this still "In Progress." The bug lists the fix versions as 3.8.0, 3.7.1, and 3.6.3. Was the change in PR 15541 back-ported to those earlier versions? Thanks! > Transactional producer stuck in IllegalStateException during close > -- > > Key: KAFKA-16217 > URL: https://issues.apache.org/jira/browse/KAFKA-16217 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.7.0, 3.6.1 >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > Labels: transactions > Fix For: 3.8.0, 3.7.1, 3.6.3 > > > The producer is stuck during the close. It keeps retrying to abort the > transaction but it never succeeds. > {code:java} > [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | > producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > org.apache.kafka.clients.producer.internals.Sender run - [Producer > clientId=producer-transaction-ben > ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, > transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > Error in kafka producer I/O thread while aborting transaction: > java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` > because the previous call to `commitTransaction` timed out and must be retried > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) > at java.base/java.lang.Thread.run(Thread.java:1583) > at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) > {code} > With the additional log, I found the root cause. If the producer is in a bad > transaction state(in my case, the TransactionManager.pendingTransition was > set to commitTransaction and did not get cleaned), then the producer calls > close and tries to abort the existing transaction, the producer will get > stuck in the transaction abortion. It is related to the fix > [https://github.com/apache/kafka/pull/13591]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16462) New consumer fails with timeout in security_test.py system test
[ https://issues.apache.org/jira/browse/KAFKA-16462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True resolved KAFKA-16462. --- Resolution: Duplicate > New consumer fails with timeout in security_test.py system test > --- > > Key: KAFKA-16462 > URL: https://issues.apache.org/jira/browse/KAFKA-16462 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{security_test.py}} system test fails with the following error: > {noformat} > test_id: > kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 1 minute 30.885 seconds > TimeoutError('') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/security_test.py", > line 142, in test_client_ssl_endpoint_validation_failure > wait_until(lambda: self.producer_consumer_have_expected_error(error), > timeout_sec=30) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", > line 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16461) New consumer fails to consume records in security_test.py system test
[ https://issues.apache.org/jira/browse/KAFKA-16461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16461: -- Description: The {{security_test.py}} system test fails with the following error: {quote} * Consumer failed to consume up to offsets {quote} Affected test: * {{test_client_ssl_endpoint_validation_failure}} Cause: The system test was failing because the {{VerifiableConsumer}} hit a {{NullPointerException}} during startup. The reason for the NPE was an attempt to put a {{null}} as the value of {{--group-remote-assignor}} in the {{Consumer}}'s configuration. was: The {{security_test.py}} system test fails with the following error: {quote} * Consumer failed to consume up to offsets {quote} Affected test: * {{test_client_ssl_endpoint_validation_failure}} > New consumer fails to consume records in security_test.py system test > - > > Key: KAFKA-16461 > URL: https://issues.apache.org/jira/browse/KAFKA-16461 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{security_test.py}} system test fails with the following error: > {quote} > * Consumer failed to consume up to offsets > {quote} > Affected test: > * {{test_client_ssl_endpoint_validation_failure}} > Cause: > The system test was failing because the {{VerifiableConsumer}} hit a > {{NullPointerException}} during startup. The reason for the NPE was an > attempt to put a {{null}} as the value of {{--group-remote-assignor}} in the > {{Consumer}}'s configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16464) New consumer fails with timeout in replication_replica_failure_test.py system test
[ https://issues.apache.org/jira/browse/KAFKA-16464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True resolved KAFKA-16464. --- Resolution: Duplicate > New consumer fails with timeout in replication_replica_failure_test.py system > test > -- > > Key: KAFKA-16464 > URL: https://issues.apache.org/jira/browse/KAFKA-16464 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{replication_replica_failure_test.py}} system test fails with the > following error: > {noformat} > test_id: > kafkatest.tests.core.replication_replica_failure_test.ReplicationReplicaFailureTest.test_replication_with_replica_failure.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 1 minute 20.972 seconds > TimeoutError('Timed out after 30s while awaiting initial record delivery > of 5 records') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/replication_replica_failure_test.py", > line 97, in test_replication_with_replica_failure > self.await_startup() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/end_to_end.py", > line 125, in await_startup > (timeout_sec, min_records)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", > line 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: Timed out after 30s while awaiting initial > record delivery of 5 records > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576631689 ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -59,87 +63,66 @@ public Builder setCombined(boolean combined) { } public Builder setNumControllerNodes(int numControllerNodes) { -if (numControllerNodes < 0) { -throw new RuntimeException("Invalid negative value for numControllerNodes"); -} - -while (controllerNodeBuilders.size() > numControllerNodes) { -controllerNodeBuilders.pollFirstEntry(); -} -while (controllerNodeBuilders.size() < numControllerNodes) { -int nextId = startControllerId(); -if (!controllerNodeBuilders.isEmpty()) { -nextId = controllerNodeBuilders.lastKey() + 1; -} -controllerNodeBuilders.put(nextId, -new ControllerNode.Builder(). -setId(nextId)); -} +this.numControllerNodes = numControllerNodes; return this; } public Builder setNumBrokerNodes(int numBrokerNodes) { -return setBrokerNodes(numBrokerNodes, 1); +this.numBrokerNodes = numBrokerNodes; +return this; +} + +public Builder setNumDisksPerBroker(int numDisksPerBroker) { +this.numDisksPerBroker = numDisksPerBroker; +return this; +} + +public Builder setPerBrokerProperties(Map> perBrokerProperties) { +this.perBrokerProperties = Collections.unmodifiableMap( +perBrokerProperties.entrySet().stream() +.collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()); +return this; } -public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { +public TestKitNodes build() { +if (numControllerNodes < 0) { +throw new RuntimeException("Invalid negative value for numControllerNodes"); +} if (numBrokerNodes < 0) { throw new RuntimeException("Invalid negative value for numBrokerNodes"); } -if (disksPerBroker <= 0) { -throw new RuntimeException("Invalid value for disksPerBroker"); -} -while (brokerNodeBuilders.size() > numBrokerNodes) { -brokerNodeBuilders.pollFirstEntry(); -} -while (brokerNodeBuilders.size() < numBrokerNodes) { -int nextId = startBrokerId(); -if (!brokerNodeBuilders.isEmpty()) { -nextId = brokerNodeBuilders.lastKey() + 1; -} -BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() -.setId(nextId) -.setNumLogDirectories(disksPerBroker); -brokerNodeBuilders.put(nextId, brokerNodeBuilder); +if (numDisksPerBroker <= 0) { +throw new RuntimeException("Invalid value for numDisksPerBroker"); } -return this; -} -public TestKitNodes build() { String baseDirectory = TestUtils.tempDirectory().getAbsolutePath(); -try { -if (clusterId == null) { -clusterId = Uuid.randomUuid(); -} -TreeMap controllerNodes = new TreeMap<>(); -for (ControllerNode.Builder builder : controllerNodeBuilders.values()) { -ControllerNode node = builder. -build(baseDirectory, clusterId, brokerNodeBuilders.containsKey(builder.id())); -if (controllerNodes.put(node.id(), node) != null) { -throw new RuntimeException("Duplicate builder for controller " + node.id()); -} -} -TreeMap brokerNodes = new TreeMap<>(); -for (BrokerNode.Builder builder : brokerNodeBuilders.values()) { -BrokerNode node = builder. -build(baseDirectory, clusterId, controllerNodeBuilders.containsKey(builder.id())); -if (brokerNodes.put(node.id(), node) != null) { -throw new RuntimeException("Duplicate builder for broker " + node.id()); -} -} -return new TestKitNodes(baseDirectory, -clusterId, -bootstrapMetadata, -controllerNodes, -brokerNodes); -} catch (Exception e) { -try { -Files.delete(Paths.get(baseDirectory)); -} catch (Exception x) { -throw new RuntimeException("Failed
[jira] [Commented] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs
[ https://issues.apache.org/jira/browse/KAFKA-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840163#comment-17840163 ] Sagar Rao commented on KAFKA-16604: --- [~chia7712] , ok i have assigned it to myself. > Deprecate ConfigDef.ConfigKey constructor from public APIs > -- > > Key: KAFKA-16604 > URL: https://issues.apache.org/jira/browse/KAFKA-16604 > Project: Kafka > Issue Type: Improvement >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > Currently, one can create ConfigKey by either invoking the public constructor > directly and passing it to a ConfigDef object or by invoking the a bunch of > define methods. The 2 ways can get confusing at times. Moreover, it could > lead to errors as was noticed in KAFKA-16592 > We should ideally have only 1 way exposed to the users which IMO should be to > create the objects only through the exposed define methods. This ticket is > about marking the public constructor of ConfigKey as Deprecated first and > then making it private eventually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs
[ https://issues.apache.org/jira/browse/KAFKA-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-16604: - Assignee: Sagar Rao > Deprecate ConfigDef.ConfigKey constructor from public APIs > -- > > Key: KAFKA-16604 > URL: https://issues.apache.org/jira/browse/KAFKA-16604 > Project: Kafka > Issue Type: Improvement >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > Currently, one can create ConfigKey by either invoking the public constructor > directly and passing it to a ConfigDef object or by invoking the a bunch of > define methods. The 2 ways can get confusing at times. Moreover, it could > lead to errors as was noticed in KAFKA-16592 > We should ideally have only 1 way exposed to the users which IMO should be to > create the objects only through the exposed define methods. This ticket is > about marking the public constructor of ConfigKey as Deprecated first and > then making it private eventually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16607) Update the KIP and metrics implementation to include the new state
José Armando García Sancio created KAFKA-16607: -- Summary: Update the KIP and metrics implementation to include the new state Key: KAFKA-16607 URL: https://issues.apache.org/jira/browse/KAFKA-16607 Project: Kafka Issue Type: Sub-task Components: kraft Reporter: José Armando García Sancio KafkaRaftMetrics exposes a current-state metrics that needs to be updated to include the prospective state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
frankvicky commented on PR #15766: URL: https://github.com/apache/kafka/pull/15766#issuecomment-2072873129 > @frankvicky @m1a2st It seems your PR (#15766 and #15779) need a consumer running in background. Hence, we can consider moving `AbstractConsumerGroupExecutor`/`ConsumerGroupExecutor` to a individual java file. WDYT? It sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on PR #15766: URL: https://github.com/apache/kafka/pull/15766#issuecomment-2072864210 @chia7712 's comment makes sense to me, and heads-up, similar classes are already defined in [ConsumerGroupCommandTest](https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java#L315), so probably sensible to take a look at all and consider consolidating -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Current working branch for KIP-848 kip [kafka]
lucasbru opened a new pull request, #15789: URL: https://github.com/apache/kafka/pull/15789 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]
chia7712 commented on PR #15788: URL: https://github.com/apache/kafka/pull/15788#issuecomment-2072851173 @johnnychhsu Instead of removing them, could you make `MetadataLogConfig` use those help methods? https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/core/src/main/scala/kafka/MetadataLogConfig.scala#L38 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840150#comment-17840150 ] Mickael Maison commented on KAFKA-16606: To me that kind of looks like a bug. Previously JBOD was not supported with KRaft so it was a big deal that it did not quite work. As we're approaching JBOD being production ready, we should ease the rough edges and avoid allowing weird unsupported configurations. As JBOD is not production ready, would we really break anyone's cluster if we prevented using multiple log dirs with KRaft and the version set as < 3.7? I understand some people may disagree but I wonder if it's a discussion worth having. > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 >
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576531833 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -84,17 +125,38 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion()) } - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"), + @ClusterTests(Array( Review Comment: Already added the TODO in the top of this file. L30 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on PR #15766: URL: https://github.com/apache/kafka/pull/15766#issuecomment-2072841232 @frankvicky @m1a2st It seems your PR (#15766 and #15779) need a consumer running in background. Hence, we can consider moving `AbstractConsumerGroupExecutor`/`ConsumerGroupExecutor` to a individual java file. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576523223 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,440 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true") +}) +public class DeleteConsumerGroupsTest { +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; +private static final String MISSING_GROUP = "missing.group"; + +private final ClusterInstance cluster; +private final Iterable groupProtocols; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +this.groupProtocols = cluster.isKRaftTest() +? Arrays.asList(CLASSIC, CONSUMER) +:
[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840148#comment-17840148 ] Igor Soarez commented on KAFKA-16606: - That's right [~scholzj] . When KIP-858 started, the documentation was already explicit about JBOD support being one of the missing features in KRaft, but the configuration was already allowed, so it didn't seem right to change it at the time, but in hindsight maybe that would've been a better decision. > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 >
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1576516058 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +/** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ +public enum AssignmentType { +FULL, INCREMENTAL +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"FULL", "INCREMENTAL"}) +private AssignmentType assignmentType; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private static final int MAX_BUCKET_COUNT = 5; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new
Re: [PR] KAFKA-15838: ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields [kafka]
mfvitale commented on PR #15756: URL: https://github.com/apache/kafka/pull/15756#issuecomment-2072820368 > The `configure()` method on Transformation is only called with configurations provided with their prefix. This cleared my doubt. So in that case it should be clear declared as an SMT configuration. I'll open a KIP. Thanks for the explanation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576508031 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -42,109 +58,141 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { -String[] getArgs(String group, String topic) { -return new String[] { -"--bootstrap-server", bootstrapServers(listenerName()), -"--delete-offsets", -"--group", group, -"--topic", topic -}; +@Tag("integration") +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true") +}) +@ExtendWith(ClusterTestExtensions.class) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { +public static final String TOPIC = "foo"; +public static final String GROUP = "test.group"; +private final ClusterInstance clusterInstance; + +private ConsumerGroupCommand.ConsumerGroupService consumerGroupService; +private final Iterable> consumerConfigs; + +DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +this.consumerConfigs = clusterInstance.isKRaftTest() +? Arrays.asList( +new HashMap() {{ Review Comment: We can use immutable map and make `createConsumer` create a inner mutable map to collect all configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16475: add test for TopicImageNodeTest [kafka]
johnnychhsu commented on PR #15720: URL: https://github.com/apache/kafka/pull/15720#issuecomment-2072812226 thanks for the review @cmccabe ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1576507146 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +/** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ +public enum AssignmentType { +FULL, INCREMENTAL +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"FULL", "INCREMENTAL"}) +private AssignmentType assignmentType; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private static final int MAX_BUCKET_COUNT = 5; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new
Re: [PR] KAFKA-14509: [3/4] Add integration test for consumerGroupDescribe Api [kafka]
johnnychhsu commented on code in PR #15727: URL: https://github.com/apache/kafka/pull/15727#discussion_r1576500340 ## core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala: ## @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server + +import kafka.server.GroupCoordinatorBaseRequestTest +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.utils.TestUtils +import org.apache.kafka.common.ConsumerGroupState +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, DescribedGroup, TopicPartitions} +import org.apache.kafka.common.protocol.ApiKeys +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.{Tag, Timeout} + +import scala.jdk.CollectionConverters._ + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) Review Comment: may i know if we also need to include zk mode for this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576492165 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -66,17 +69,30 @@ public Builder setNumLogDirectories(int numLogDirectories) { return this; } -public BrokerNode build( -String baseDirectory, -Uuid clusterId, -boolean combined -) { +public Builder setClusterId(Uuid clusterId) { +this.clusterId = clusterId; +return this; +} + +public Builder setBaseDirectory(String baseDirectory) { +this.baseDirectory = baseDirectory; +return this; +} + +public Builder setCombined(boolean combined) { +this.combined = combined; +return this; +} + +public Builder setPropertyOverrides(Map propertyOverrides) { +this.propertyOverrides = Collections.unmodifiableMap(new HashMap<>(propertyOverrides)); +return this; +} + +public BrokerNode build() { if (id == -1) { throw new RuntimeException("You must set the node id."); } -if (incarnationId == null) { -incarnationId = Uuid.randomUuid(); -} List logDataDirectories = IntStream Review Comment: Could you add null check for `baseDirectory`? ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) { } public Builder setNumControllerNodes(int numControllerNodes) { -if (numControllerNodes < 0) { -throw new RuntimeException("Invalid negative value for numControllerNodes"); -} - -while (controllerNodeBuilders.size() > numControllerNodes) { -controllerNodeBuilders.pollFirstEntry(); -} -while (controllerNodeBuilders.size() < numControllerNodes) { -int nextId = startControllerId(); -if (!controllerNodeBuilders.isEmpty()) { -nextId = controllerNodeBuilders.lastKey() + 1; -} -controllerNodeBuilders.put(nextId, -new ControllerNode.Builder(). -setId(nextId)); -} +this.numControllerNodes = numControllerNodes; return this; } public Builder setNumBrokerNodes(int numBrokerNodes) { -return setBrokerNodes(numBrokerNodes, 1); +this.numBrokerNodes = numBrokerNodes; +return this; +} + +public Builder setNumDisksPerBroker(int numDisksPerBroker) { +this.numDisksPerBroker = numDisksPerBroker; +return this; +} + +public Builder setPerBrokerProperties(Map> perBrokerProperties) { +this.perBrokerProperties = Collections.unmodifiableMap( +perBrokerProperties.entrySet().stream() +.collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()); +return this; } -public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { +public TestKitNodes build() { +if (numControllerNodes < 0) { +throw new RuntimeException("Invalid negative value for numControllerNodes"); +} if (numBrokerNodes < 0) { throw new RuntimeException("Invalid negative value for numBrokerNodes"); } -if (disksPerBroker <= 0) { -throw new RuntimeException("Invalid value for disksPerBroker"); -} -while (brokerNodeBuilders.size() > numBrokerNodes) { -brokerNodeBuilders.pollFirstEntry(); +if (numDisksPerBroker <= 0) { +throw new RuntimeException("Invalid value for numDisksPerBroker"); } -while (brokerNodeBuilders.size() < numBrokerNodes) { -int nextId = startBrokerId(); -if (!brokerNodeBuilders.isEmpty()) { -nextId = brokerNodeBuilders.lastKey() + 1; -} -BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() -.setId(nextId) -.setNumLogDirectories(disksPerBroker); -brokerNodeBuilders.put(nextId, brokerNodeBuilder); -} -return this; -} -public TestKitNodes build() { String baseDirectory = TestUtils.tempDirectory().getAbsolutePath(); Review Comment: We don't need to delete `baseDirectory` since `TestUtils.tempDirectory()` will delete the return folder when terminating. ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -167,11
Re: [PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]
johnnychhsu commented on PR #15787: URL: https://github.com/apache/kafka/pull/15787#issuecomment-2072775161 nice fix! crystal clear solution :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jakub Scholz resolved KAFKA-16606. -- Resolution: Not A Problem > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 >
Re: [PR] MINOR: Various cleanups in core [kafka]
chia7712 commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576457731 ## core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala: ## @@ -29,9 +29,9 @@ import scala.jdk.CollectionConverters._ */ class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logger) { import LinuxIoMetricsCollector._ - var lastUpdateMs: Long = -1L - var cachedReadBytes:Long = 0L - var cachedWriteBytes:Long = 0L + private var lastUpdateMs: Long = -1L Review Comment: It seems the type declaration is unnecessary since its literal ends with `L` ## core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala: ## @@ -347,7 +347,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { if (isKRaftTest()) { val result = new util.HashMap[Uuid, String]() controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach { Review Comment: how about `controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().forEach((k, v) => result.put(v, k))` ## core/src/main/scala/kafka/server/SharedServer.scala: ## @@ -107,16 +107,16 @@ class SharedServer( @volatile var brokerMetrics: BrokerServerMetrics = _ @volatile var controllerServerMetrics: ControllerMetadataMetrics = _ @volatile var loader: MetadataLoader = _ - val snapshotsDisabledReason = new AtomicReference[String](null) + private val snapshotsDisabledReason = new AtomicReference[String](null) @volatile var snapshotEmitter: SnapshotEmitter = _ - @volatile var snapshotGenerator: SnapshotGenerator = _ - @volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _ + @volatile private var snapshotGenerator: SnapshotGenerator = _ + @volatile private var metadataLoaderMetrics: MetadataLoaderMetrics = _ def clusterId: String = metaPropsEnsemble.clusterId().get() - def nodeId: Int = metaPropsEnsemble.nodeId().getAsInt() + def nodeId: Int = metaPropsEnsemble.nodeId().getAsInt - def isUsed(): Boolean = synchronized { + private def isUsed(): Boolean = synchronized { Review Comment: How about `isUsed`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
frankvicky commented on PR #15766: URL: https://github.com/apache/kafka/pull/15766#issuecomment-2072734321 Hi @lianetm, @chia7712 Thanks for the suggestions, I have addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Remove unused parameters in KafkaConfig [kafka]
johnnychhsu opened a new pull request, #15788: URL: https://github.com/apache/kafka/pull/15788 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15838: ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields [kafka]
mimaison commented on PR #15756: URL: https://github.com/apache/kafka/pull/15756#issuecomment-2072692296 While it's not directly adding new configurations, it's effectively changing the behavior of `ExtractField` and `InsertField` and making them support new configurations. The `configure()` method on Transformation is only called with configurations provided with their prefix. In this example a user would need to set something like: ``` transforms=my-smt transforms.my-smt.type=MyTransformation transforms.my-smt.key.converter.replace.null.with.default=false ``` Also by not defining the configurations explicitly users can't discover them using the `GET /connector-plugins/{plugin-type}/config`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840132#comment-17840132 ] Jakub Scholz commented on KAFKA-16606: -- [~soarez] That sounds like without using the 3.7-IV2 metadata JBOD in Kraft works fine until it fails and then it might cause problems. So I wonder if it would have been better to protect the users from that situation. But it sounds like it was an intentional decision. And as I said, changing it now might be anyway a bit tricky as it might break someone's cluster after upgrading to 3.7.1 / 3.8.0. So I guess it sounds good as it is and I will close this. Thanks for the explanation. > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 >
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576432963 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -121,16 +146,16 @@ public BrokerNode build( private final boolean combined; private final Map propertyOverrides; -BrokerNode( +private BrokerNode( Uuid incarnationId, MetaPropertiesEnsemble initialMetaPropertiesEnsemble, boolean combined, Map propertyOverrides ) { -this.incarnationId = incarnationId; -this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble; Review Comment: I make `logDataDirectories()` return Set instead List to get rid of the redundant arraylist wrapping. ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -121,16 +146,16 @@ public BrokerNode build( private final boolean combined; private final Map propertyOverrides; -BrokerNode( +private BrokerNode( Uuid incarnationId, MetaPropertiesEnsemble initialMetaPropertiesEnsemble, boolean combined, Map propertyOverrides ) { -this.incarnationId = incarnationId; -this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble; Review Comment: I make `logDataDirectories()` return `Set` instead `List` to get rid of the redundant arraylist wrapping. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576432135 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -202,7 +256,7 @@ private KafkaProducer createProducer(Properties config) { } private Consumer createConsumer(Properties config) { Review Comment: Updated it and also added empty map for zk, or zk will not be tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
mimaison commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576422765 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -959,7 +960,7 @@ class ZkAdminManager(val config: KafkaConfig, } else if (requestStatus.mechanism == Some(ScramMechanism.UNKNOWN)) { (requestStatus.user, unknownScramMechanismMsg) Review Comment: It looks weird to me but it seems `contains()` is Scala idiomatic, so I made the change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
mimaison commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576420731 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -144,7 +144,7 @@ case class LogReadResult(info: FetchDataInfo, def withEmptyFetchInfo: LogReadResult = Review Comment: You're right, this is unused -> removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576409478 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840125#comment-17840125 ] Igor Soarez commented on KAFKA-16606: - Thanks for bringing this to my attention [~mimaison]. Hi [~scholzj] , thanks for pointing this out. I think there's some confusion here with a JBOD configuration being +allowed+ vs being +supported+ in KRaft. In terms of just reading and writing data to multiple log directories, as long as those direcories are always available, there's nothing special about KRaft that would require changes compared with ZK mode. What is enabled with 3.7 is the handling of failed log directories. You'll find that partitions don't get new leaders elected – becoming indefinitely unavailable – if the log directory for the leader replica fails but the broker stays alive. If a single directory is configured and it becomes unavailable the broker shuts down, as there is no point in continuing to run without access to storage. When it shuts down the controller becomes aware of that – via an ephemeral znode in ZK mode, or via missing hearbeats in KRaft – and it will re-elect leaders for partitions that were led by the broker. When multiple directories are configured, it is critical to have a separate mechanism to let the controller know there is a partial failure – the broker is still alive and operational on the remaining log dirs, but any partitions on the directory that failed need a leadership and ISR update. In ZK mode that was handled by notifying the controller via a znode, so a an alternative solution was required for KRaft. You can find the details in [KIP-858|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft]. Let me know if that makes sense. > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 >
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576385492 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576392526 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
Re: [PR] KAFKA-15931: Reopen TransactionIndex if channel is closed [kafka]
nikramakrishnan commented on PR #15241: URL: https://github.com/apache/kafka/pull/15241#issuecomment-2072530250 Bump! @satishd @kamalcph can we get this review going? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576386813 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -202,7 +256,7 @@ private KafkaProducer createProducer(Properties config) { } private Consumer createConsumer(Properties config) { Review Comment: We can change the type from `Properties` to `Map`. With that change we don't need to create a lot of `Properties` in each test case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576385492 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576375031 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -2192,8 +2192,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @ValueSource(strings = Array("zk", "kraft")) def testLongTopicNames(quorum: String): Unit = { val client = Admin.create(createConfig) -val longTopicName = String.join("", Collections.nCopies(249, "x")); -val invalidTopicName = String.join("", Collections.nCopies(250, "x")); +val longTopicName = String.join("", Collections.nCopies(249, "x")) Review Comment: Small scala suggestion here (which feel free to ignore) we can use `List.fill(249)("x").mkString("")` instead of Java `String.join` and `Collections.nCopies` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
KevinZTW commented on code in PR #15714: URL: https://github.com/apache/kafka/pull/15714#discussion_r1576372796 ## storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; + +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class PartitionMetadataFileTest { +private final File dir = TestUtils.tempDirectory(); +private final File file = PartitionMetadataFile.newFile(dir); + +@Test +public void testSetRecordWithDifferentTopicId() { +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +partitionMetadataFile.record(topicId); +Uuid differentTopicId = Uuid.randomUuid(); +assertThrows(InconsistentTopicIdException.class, () -> partitionMetadataFile.record(differentTopicId)); +} + +@Test +public void testSetRecordWithSameTopicId() { +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +partitionMetadataFile.record(topicId); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); Review Comment: sorry I missed this one just update the PR as suggested! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576375031 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -2192,8 +2192,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @ValueSource(strings = Array("zk", "kraft")) def testLongTopicNames(quorum: String): Unit = { val client = Admin.create(createConfig) -val longTopicName = String.join("", Collections.nCopies(249, "x")); -val invalidTopicName = String.join("", Collections.nCopies(250, "x")); +val longTopicName = String.join("", Collections.nCopies(249, "x")) Review Comment: Small scala suggestion here (which you feel free to ignore) we can use `List.fill(249)("x").mkString("")` instead of Java `String.join` and `Collections.nCopies` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840118#comment-17840118 ] Mickael Maison commented on KAFKA-16606: cc [~soarez] > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 >
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576366593 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig, users.get.filterNot(usersToSkip.contains).foreach { user => try { val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, Sanitizer.sanitize(user)) -addToResultsIfHasScramCredential(user, userConfigs, true) +addToResultsIfHasScramCredential(user, userConfigs, explicitUser = true) } catch { case e: Exception => { Review Comment: make sense don't worry about it. We are moving more and more from scala to java anyway so these will get resolved over time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
mimaison commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576363327 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig, users.get.filterNot(usersToSkip.contains).foreach { user => try { val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, Sanitizer.sanitize(user)) -addToResultsIfHasScramCredential(user, userConfigs, true) +addToResultsIfHasScramCredential(user, userConfigs, explicitUser = true) } catch { case e: Exception => { Review Comment: That's just my guesstimate, but I expect it to be large. I kind of focused on the low hanging fruits here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576360092 ## core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala: ## @@ -52,8 +51,8 @@ object ZooKeeperClient { * @param sessionTimeoutMs session timeout in milliseconds * @param connectionTimeoutMs connection timeout in milliseconds * @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking. + * @param clientConfig ZooKeeper client configuration, for TLS configs if desired Review Comment: Nice, I believe we might need to update the have java doc in `registerStateChangeHandler` and `unregisterStateChangeHandler` in same file as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
mimaison commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576358398 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -300,7 +300,7 @@ class ReplicaManager(val config: KafkaConfig, protected val allPartitions = new Pool[TopicPartition, HostedPartition]( Review Comment: Ideally we should declare types for all public and protected fields but this is a huge change. Also while it's useful in some cases, in many I find it's not adding much value. In this specific example I even find it annoying as you get: ``` protected val allPartitions: Pool[TopicPartition, HostedPartition] = new Pool[TopicPartition, HostedPartition]( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
mimaison commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576351984 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig, users.get.filterNot(usersToSkip.contains).foreach { user => try { val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, Sanitizer.sanitize(user)) -addToResultsIfHasScramCredential(user, userConfigs, true) +addToResultsIfHasScramCredential(user, userConfigs, explicitUser = true) } catch { case e: Exception => { Review Comment: Yeah in Scala braces are not required around multi-line blocks. I've not made this change because braces are required in Java and we have the braces in Scala all over the code base. Changing this is probably a >500 line diff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on PR #15786: URL: https://github.com/apache/kafka/pull/15786#issuecomment-2072445566 We also have couple of out-of-date parameters in javaDocs, we can either fix here or have another followup pr - https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala#L150 - https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala#L270 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576354443 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig, users.get.filterNot(usersToSkip.contains).foreach { user => try { val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, Sanitizer.sanitize(user)) -addToResultsIfHasScramCredential(user, userConfigs, true) +addToResultsIfHasScramCredential(user, userConfigs, explicitUser = true) } catch { case e: Exception => { Review Comment: ough >500 is too much. Well they will get cleanup as we move more scala to java :D -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
Jakub Scholz created KAFKA-16606: Summary: JBOD support in KRaft does not seem to be gated by the metadata version Key: KAFKA-16606 URL: https://issues.apache.org/jira/browse/KAFKA-16606 Project: Kafka Issue Type: Bug Affects Versions: 3.7.0 Reporter: Jakub Scholz JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka [source code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. However, it seems to be possible to run KRaft cluster with JBOD even with older metadata versions such as {{{}3.6{}}}. For example, I have a cluster using the {{3.6}} metadata version: {code:java} bin/kafka-features.sh --bootstrap-server localhost:9092 describe Feature: metadata.version SupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 {code} Yet a KRaft cluster with JBOD seems to run fine: {code:java} bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe Querying brokers for log directories information Received log directory information from brokers 2000,3000,1000