[PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]

2024-04-23 Thread via GitHub


kamalcph opened a new pull request, #15793:
URL: https://github.com/apache/kafka/pull/15793

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]

2024-04-23 Thread via GitHub


kamalcph commented on PR #15787:
URL: https://github.com/apache/kafka/pull/15787#issuecomment-2074099558

   Opened #15793 to port it to v3.7 branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15782:
URL: https://github.com/apache/kafka/pull/15782#issuecomment-2074037067

   @TaiJuWu Could you please rebase code to trigger QA again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]

2024-04-23 Thread via GitHub


chia7712 merged PR #15774:
URL: https://github.com/apache/kafka/pull/15774


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15774:
URL: https://github.com/apache/kafka/pull/15774#issuecomment-2074035809

   ```
   ./gradlew cleanTest :streams:test --tests 
EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore
 :tools:test --tests 
MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful :storage:test 
--tests TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout 
:connect:runtime:test --tests 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsets
 --tests 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks
 --tests 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId
 :metadata:test --tests QuorumControllerTest.testUnregisterBroker --tests 
QuorumControllerTest.testConfigurationOperations :trogdor:test --tests 
CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test 
--tests IdentityReplicationIntegrationTest.testSyncTopicConfigs --tests 
MirrorConnectorsIntegrationTransactio
 nsTest.testSyncTopicConfigs :core:test --tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithoutDescribeAclViaSubscribe
 --tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaAssign
 --tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithoutDescribeAclViaAssign
 --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoGroupAcl 
--tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests 
ZkMigrationIntegrationTest.testDualWrite --tests 
DynamicBrokerReconfigurationTest.testTrustStoreAlter --tests 
ReplicationQuotasTest.shouldThrottleOldSegments
   ```
   Don't observe related failure, and they pass on my local


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-23 Thread via GitHub


chia7712 merged PR #15714:
URL: https://github.com/apache/kafka/pull/15714


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-23 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1577225484


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   I can try again to validate it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-23 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1577223693


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   I tried that, it still threw session timeout once in a while when I ran it 
for 30 times . So to be in the safer side, I added it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on PR #15773:
URL: https://github.com/apache/kafka/pull/15773#issuecomment-2073981282

   @showuon thanks for the review and clarify scheduler behavior. Learn a lot 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-23 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1577220797


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {
+log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+"in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +
+"If you see this happening consistently, then it can be addressed 
by either adding more workers " +
+"to the connect cluster or by increasing the rebalance.timeout.ms 
configuration value. Please note that " +
+"rebalance.timeout.ms also controls the maximum allowed time for 
each worker to join the group once a " +
+"rebalance has begun so the set value should not be very high");

Review Comment:
   @gharris1727 @mimaison , could you help review these log output since you're 
the connect experts. Thanks.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {
+log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+"in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +
+"If you see this happening consistently, then it can be addressed 
by either adding more workers " +
+"to the connect cluster or by increasing the rebalance.timeout.ms 
configuration value. Please note that " +
+"rebalance.timeout.ms also controls the maximum allowed time for 
each worker to join the group once a " +
+"rebalance has begun so the set value should not be very high");

Review Comment:
   @gharris1727 @mimaison , could you help review these log output since you're 
the connect experts? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-23 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1577219150


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   If you set `RebalanceTimeout < sessionTimeout`, why do we need the 
heartBeatRequest? You should be able to sleep (RebalanceTimeout + 1) directly 
to get the log. Ex:
   
   ```
   try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
   coordinator.ensureActiveGroup();
   coordinator.poll(0, () -> null);
   
time.sleep(smallRebalanceTimeout + 1);
   
   // The heartbeat thread is running and keeps sending heartbeat 
requests.
   TestUtils.waitForCondition(() -> 
logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("WARN")) &&
   logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getMessage().startsWith("worker poll timeout has expired"), ...)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]

2024-04-23 Thread via GitHub


showuon commented on PR #15773:
URL: https://github.com/apache/kafka/pull/15773#issuecomment-2073971783

   Re-trigger CI build: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15773/5/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]

2024-04-23 Thread via GitHub


showuon commented on PR #15773:
URL: https://github.com/apache/kafka/pull/15773#issuecomment-2073971106

   > 
   
   If we want to use taskRunning to check whether deletionTask is executed, 
we may need a new value in LogManager class to keep scheduler future object. 
WDYT?
   
   > I think that makes sense. Or alternatively, we can add another test to 
test the deleteLogs method that it should not be blocked if fileDeleteDelayMs=0.
   
   Sorry, I was wrong. I was checking the 
`ScheduledThreadPoolExecutor#shutdown`, it'll wait until all the tasks in queue 
to be completed. So as long as we are sure the task is being in queue, I think 
we don't need further verification anymore. That said, no other modification is 
needed. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]

2024-04-23 Thread via GitHub


showuon commented on PR #15773:
URL: https://github.com/apache/kafka/pull/15773#issuecomment-2073966870

   > I think deletionTask must be executed. From LogManager#startup to 
LogManager#startupWithConfigOverrides is not asynchronous code, so deletionTask 
must be put into scheduler queue. During scheduler shutdown, new tasks can not 
join, but previous task will be executed.
   
   Yes, you're right. It'll be enqueued anyway. 
   
   > If we want to use taskRunning to check whether deletionTask is executed, 
we may need a new value in LogManager class to keep scheduler future object. 
WDYT?
   
   I think that makes sense. Or alternatively, we can add another test to test 
the `deleteLogs` method that it should not be blocked if `fileDeleteDelayMs=0`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Store separate output per test case instead of suite [kafka]

2024-04-23 Thread via GitHub


github-actions[bot] commented on PR #15258:
URL: https://github.com/apache/kafka/pull/15258#issuecomment-2073953728

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on code in PR #15773:
URL: https://github.com/apache/kafka/pull/15773#discussion_r1577187165


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -1343,6 +1346,45 @@ class LogManagerTest {
   assertFalse(f.exists())
 }
   }
+
+  /**
+   * Test KafkaScheduler can be shutdown when file delete delay is set to 0.
+   */
+  @Test
+  def testShutdownWithZeroFileDeleteDelayMs(): Unit = {
+val tmpLogDir = TestUtils.tempDir()
+val tmpProperties = new Properties()
+tmpProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "0")
+val scheduler = new KafkaScheduler(1, true, "log-manager-test")
+val tmpLogManager = new LogManager(logDirs = 
Seq(tmpLogDir).map(_.getAbsoluteFile),
+  initialOfflineDirs = Array.empty[File],
+  configRepository = new MockConfigRepository,
+  initialDefaultConfig = new LogConfig(tmpProperties),
+  cleanerConfig = new CleanerConfig(false),
+  recoveryThreadsPerDataDir = 1,
+  flushCheckMs = 1000L,
+  flushRecoveryOffsetCheckpointMs = 1L,
+  flushStartOffsetCheckpointMs = 1L,
+  retentionCheckMs = 1000L,
+  maxTransactionTimeoutMs = 5 * 60 * 1000,
+  producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+  producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+  scheduler = scheduler,
+  time = Time.SYSTEM,
+  brokerTopicStats = new BrokerTopicStats,
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  keepPartitionMetadataFile = true,
+  interBrokerProtocolVersion = MetadataVersion.latestTesting,
+  remoteStorageSystemEnable = false,
+  initialTaskDelayMs = 0)
+
+scheduler.startup()
+tmpLogManager.startup(Set.empty)
+val stopLogManager: Executable = () => tmpLogManager.shutdown()
+val stopScheduler: Executable = () => scheduler.shutdown()
+assertTimeoutPreemptively(Duration.ofMillis(5000), stopLogManager)
+assertTimeoutPreemptively(Duration.ofMillis(5000), stopScheduler)

Review Comment:
   I think `deletionTask` must be executed. From `LogManager#startup` to 
`LogManager#startupWithConfigOverrides` is not asynchronous code, so 
`deletionTask` must be put into scheduler queue.  During scheduler shutdown, 
new tasks can not join, but previous task will be executed.
   
   If we want to use `taskRunning` to check whether deletionTask is executed, 
we may need a new value in LogManager class to keep scheduler future object. 
WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]

2024-04-23 Thread via GitHub


showuon commented on PR #15787:
URL: https://github.com/apache/kafka/pull/15787#issuecomment-2073928975

   Re-triggering CI build: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15787/3/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-23 Thread via GitHub


showuon merged PR #15748:
URL: https://github.com/apache/kafka/pull/15748


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-23 Thread via GitHub


showuon commented on PR #15748:
URL: https://github.com/apache/kafka/pull/15748#issuecomment-2073928078

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-23 Thread via GitHub


showuon commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2073911312

   Retriggering CI build : 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15616/11/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]

2024-04-23 Thread via GitHub


showuon commented on code in PR #15773:
URL: https://github.com/apache/kafka/pull/15773#discussion_r1577162256


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -1343,6 +1346,45 @@ class LogManagerTest {
   assertFalse(f.exists())
 }
   }
+
+  /**
+   * Test KafkaScheduler can be shutdown when file delete delay is set to 0.
+   */
+  @Test
+  def testShutdownWithZeroFileDeleteDelayMs(): Unit = {
+val tmpLogDir = TestUtils.tempDir()
+val tmpProperties = new Properties()
+tmpProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "0")
+val scheduler = new KafkaScheduler(1, true, "log-manager-test")
+val tmpLogManager = new LogManager(logDirs = 
Seq(tmpLogDir).map(_.getAbsoluteFile),
+  initialOfflineDirs = Array.empty[File],
+  configRepository = new MockConfigRepository,
+  initialDefaultConfig = new LogConfig(tmpProperties),
+  cleanerConfig = new CleanerConfig(false),
+  recoveryThreadsPerDataDir = 1,
+  flushCheckMs = 1000L,
+  flushRecoveryOffsetCheckpointMs = 1L,
+  flushStartOffsetCheckpointMs = 1L,
+  retentionCheckMs = 1000L,
+  maxTransactionTimeoutMs = 5 * 60 * 1000,
+  producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+  producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+  scheduler = scheduler,
+  time = Time.SYSTEM,
+  brokerTopicStats = new BrokerTopicStats,
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  keepPartitionMetadataFile = true,
+  interBrokerProtocolVersion = MetadataVersion.latestTesting,
+  remoteStorageSystemEnable = false,
+  initialTaskDelayMs = 0)
+
+scheduler.startup()
+tmpLogManager.startup(Set.empty)
+val stopLogManager: Executable = () => tmpLogManager.shutdown()
+val stopScheduler: Executable = () => scheduler.shutdown()
+assertTimeoutPreemptively(Duration.ofMillis(5000), stopLogManager)
+assertTimeoutPreemptively(Duration.ofMillis(5000), stopScheduler)

Review Comment:
   How could we make sure the `kafka-delete-logs` task is already running 
before we run shutdown? Even though we schedule it with 0 delay, it is still 
not guaranteed the task (thread) will be created immediately, right? I saw 
there's `scheduler#taskRunning` that can be used to verify that. Could we add 
that? Ex:
   
   ```
   scheduler.startup()
   tmpLogManager.startup(Set.empty)
   TestUtils.waitUntilTrue( () => scheduler.taskRunning(deletionTask), ...)
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]

2024-04-23 Thread via GitHub


showuon commented on code in PR #15773:
URL: https://github.com/apache/kafka/pull/15773#discussion_r1577162256


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -1343,6 +1346,45 @@ class LogManagerTest {
   assertFalse(f.exists())
 }
   }
+
+  /**
+   * Test KafkaScheduler can be shutdown when file delete delay is set to 0.
+   */
+  @Test
+  def testShutdownWithZeroFileDeleteDelayMs(): Unit = {
+val tmpLogDir = TestUtils.tempDir()
+val tmpProperties = new Properties()
+tmpProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "0")
+val scheduler = new KafkaScheduler(1, true, "log-manager-test")
+val tmpLogManager = new LogManager(logDirs = 
Seq(tmpLogDir).map(_.getAbsoluteFile),
+  initialOfflineDirs = Array.empty[File],
+  configRepository = new MockConfigRepository,
+  initialDefaultConfig = new LogConfig(tmpProperties),
+  cleanerConfig = new CleanerConfig(false),
+  recoveryThreadsPerDataDir = 1,
+  flushCheckMs = 1000L,
+  flushRecoveryOffsetCheckpointMs = 1L,
+  flushStartOffsetCheckpointMs = 1L,
+  retentionCheckMs = 1000L,
+  maxTransactionTimeoutMs = 5 * 60 * 1000,
+  producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+  producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+  scheduler = scheduler,
+  time = Time.SYSTEM,
+  brokerTopicStats = new BrokerTopicStats,
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  keepPartitionMetadataFile = true,
+  interBrokerProtocolVersion = MetadataVersion.latestTesting,
+  remoteStorageSystemEnable = false,
+  initialTaskDelayMs = 0)
+
+scheduler.startup()
+tmpLogManager.startup(Set.empty)
+val stopLogManager: Executable = () => tmpLogManager.shutdown()
+val stopScheduler: Executable = () => scheduler.shutdown()
+assertTimeoutPreemptively(Duration.ofMillis(5000), stopLogManager)
+assertTimeoutPreemptively(Duration.ofMillis(5000), stopScheduler)

Review Comment:
   How could we make sure the `kafka-delete-logs` task is already running 
before we run shutdown? I saw there's `scheduler#taskRunning` that can be used 
to verify that. Could we add that? Ex:
   
   ```
   scheduler.startup()
   tmpLogManager.startup(Set.empty)
   TestUtils.waitUntilTrue( () => scheduler.taskRunning(deletionTask), ...)
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1577073007


##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -198,7 +177,7 @@ public BootstrapMetadata bootstrapMetadata() {
 return bootstrapMetadata;
 }
 
-public NavigableMap brokerNodes() {
+public Map brokerNodes() {

Review Comment:
   I found all tests  failed... Sorry for wasting your time, I'll be more 
careful :disappointed: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1577056133


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -42,109 +58,141 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
-String[] getArgs(String group, String topic) {
-return new String[] {
-"--bootstrap-server", bootstrapServers(listenerName()),
-"--delete-offsets",
-"--group", group,
-"--topic", topic
-};
+@Tag("integration")
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+})
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+public static final String TOPIC = "foo";
+public static final String GROUP = "test.group";
+private final ClusterInstance clusterInstance;
+
+private ConsumerGroupCommand.ConsumerGroupService consumerGroupService;
+private final Iterable> consumerConfigs;
+
+DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) {
+this.clusterInstance = clusterInstance;
+this.consumerConfigs = clusterInstance.isKRaftTest()
+? Arrays.asList(
+new HashMap() {{

Review Comment:
   Updated it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16577: New consumer fails with stop within allotted timeout in consumer_test.py system test [kafka]

2024-04-23 Thread via GitHub


kirktrue commented on PR #15784:
URL: https://github.com/apache/kafka/pull/15784#issuecomment-2073707108

   @lucasbru/@cadonna —please review. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]

2024-04-23 Thread via GitHub


vamossagar12 commented on PR #15762:
URL: https://github.com/apache/kafka/pull/15762#issuecomment-2073698582

   The build got aborted again 樂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16608) AsyncKafkaConsumer doesn't honor interrupted thread status on KafkaConsumer.poll(Duration)

2024-04-23 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16608:
--
Summary: AsyncKafkaConsumer doesn't honor interrupted thread status on 
KafkaConsumer.poll(Duration)  (was: AsyncKafkaConsumer doesn't honour 
interrupted thread status on KafkaConsumer.poll(Duration))

> AsyncKafkaConsumer doesn't honor interrupted thread status on 
> KafkaConsumer.poll(Duration)
> --
>
> Key: KAFKA-16608
> URL: https://issues.apache.org/jira/browse/KAFKA-16608
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Minor
>
> The behaviour for KafkaConsumer.poll(Duration) when the calling thread is in 
> interrupted state is to throw InterruptException. The AsyncKafkaConsumer 
> doesn't do this. It only throws that exception if the interruption occurs 
> while it is waiting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]

2024-04-23 Thread via GitHub


appchemist commented on PR #15647:
URL: https://github.com/apache/kafka/pull/15647#issuecomment-2073683927

   @kirktrue Thanks for the heads-up!
   If you have a moment, please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-04-23 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2073680590

   @lianetm @chia7712  Thanks for your comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] enable test, check if continues failing in CI [kafka]

2024-04-23 Thread via GitHub


showuon commented on PR #13953:
URL: https://github.com/apache/kafka/pull/13953#issuecomment-2073673074

   @Kiriakos1998 , could you rebase with the latest trunk branch, and resolve 
the conflicts? I'd like to see if it still passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16609) Update parse_describe_topic to support new topic describe output

2024-04-23 Thread Kirk True (Jira)
Kirk True created KAFKA-16609:
-

 Summary: Update parse_describe_topic to support new topic describe 
output
 Key: KAFKA-16609
 URL: https://issues.apache.org/jira/browse/KAFKA-16609
 Project: Kafka
  Issue Type: Bug
  Components: admin, system tests
Affects Versions: 3.8.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


It appears that recent changes to the describe topic output has broken the 
system test's ability to parse the output.

{noformat}
test_id:
kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=False.reassign_from_offset_zero=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   50.333 seconds


IndexError('list index out of range')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py",
 line 175, in test_reassign_partitions
self.run_produce_consume_validate(core_test_action=lambda: 
self.reassign_partitions(bounce_brokers))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/produce_consume_validate.py",
 line 105, in run_produce_consume_validate
core_test_action(*args)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py",
 line 175, in 
self.run_produce_consume_validate(core_test_action=lambda: 
self.reassign_partitions(bounce_brokers))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py",
 line 82, in reassign_partitions
partition_info = 
self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/kafka/kafka.py",
 line 1400, in parse_describe_topic
fields = list(map(lambda x: x.split(" ")[1], fields))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/kafka/kafka.py",
 line 1400, in 
fields = list(map(lambda x: x.split(" ")[1], fields))
IndexError: list index out of range
{noformat} 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]

2024-04-23 Thread via GitHub


kirktrue commented on PR #15647:
URL: https://github.com/apache/kafka/pull/15647#issuecomment-2073652156

   @appchemist—thanks for the PR, and sorry for the delay in response!
   
   I've taken a first pass but am still working through the unit test changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16217: Stop the abort transaction try loop when closing producers (#15541) [kafka]

2024-04-23 Thread via GitHub


CalvinConfluent opened a new pull request, #15792:
URL: https://github.com/apache/kafka/pull/15792

   This is a mitigation fix for the 
https://issues.apache.org/jira/browse/KAFKA-16217. Exceptions should not block 
closing the producers. This PR reverts a part of the change #13591
   
   Reviewers: Kirk True , Justine Olshan 

   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16217: Stop the abort transaction try loop when closing producers (#15541) [kafka]

2024-04-23 Thread via GitHub


CalvinConfluent opened a new pull request, #15791:
URL: https://github.com/apache/kafka/pull/15791

   
   This is a mitigation fix for the 
https://issues.apache.org/jira/browse/KAFKA-16217. Exceptions should not block 
closing the producers. This PR reverts a part of the change #13591
   
   Reviewers: Kirk True , Justine Olshan 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add Integration tests for the the KIP-966 [kafka]

2024-04-23 Thread via GitHub


CalvinConfluent commented on PR #15759:
URL: https://github.com/apache/kafka/pull/15759#issuecomment-2073486845

   @mumrah @artemlivshits Can you help take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug

2024-04-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840232#comment-17840232
 ] 

Matthias J. Sax commented on KAFKA-16584:
-

I would prefer to make if configurable personally. Should be a more or less 
simple change, but requires a KIP.

> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Priority: Major
>  Labels: needs-kip, newbie
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16584) Make log processing summary configurable or debug

2024-04-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16584:

Labels: needs-kip newbie  (was: )

> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Priority: Major
>  Labels: needs-kip, newbie
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16608) AsyncKafkaConsumer doesn't honour interrupted thread status on KafkaConsumer.poll(Duration)

2024-04-23 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield reassigned KAFKA-16608:


Assignee: Andrew Schofield

> AsyncKafkaConsumer doesn't honour interrupted thread status on 
> KafkaConsumer.poll(Duration)
> ---
>
> Key: KAFKA-16608
> URL: https://issues.apache.org/jira/browse/KAFKA-16608
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Minor
>
> The behaviour for KafkaConsumer.poll(Duration) when the calling thread is in 
> interrupted state is to throw InterruptException. The AsyncKafkaConsumer 
> doesn't do this. It only throws that exception if the interruption occurs 
> while it is waiting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16608) AsyncKafkaConsumer doesn't honour interrupted thread status on KafkaConsumer.poll(Duration)

2024-04-23 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16608:
--
Component/s: consumer

> AsyncKafkaConsumer doesn't honour interrupted thread status on 
> KafkaConsumer.poll(Duration)
> ---
>
> Key: KAFKA-16608
> URL: https://issues.apache.org/jira/browse/KAFKA-16608
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Andrew Schofield
>Priority: Minor
>
> The behaviour for KafkaConsumer.poll(Duration) when the calling thread is in 
> interrupted state is to throw InterruptException. The AsyncKafkaConsumer 
> doesn't do this. It only throws that exception if the interruption occurs 
> while it is waiting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576836597


##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -198,7 +177,7 @@ public BootstrapMetadata bootstrapMetadata() {
 return bootstrapMetadata;
 }
 
-public NavigableMap brokerNodes() {
+public Map brokerNodes() {

Review Comment:
   please ignore this comment since we create all controllers before brokers, 
and hence the order of brokers is not matter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16608) AsyncKafkaConsumer doesn't honour interrupted thread status on KafkaConsumer.poll(Duration)

2024-04-23 Thread Andrew Schofield (Jira)
Andrew Schofield created KAFKA-16608:


 Summary: AsyncKafkaConsumer doesn't honour interrupted thread 
status on KafkaConsumer.poll(Duration)
 Key: KAFKA-16608
 URL: https://issues.apache.org/jira/browse/KAFKA-16608
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.8.0
Reporter: Andrew Schofield


The behaviour for KafkaConsumer.poll(Duration) when the calling thread is in 
interrupted state is to throw InterruptException. The AsyncKafkaConsumer 
doesn't do this. It only throws that exception if the interruption occurs while 
it is waiting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14509: [3/4] Add integration test for consumerGroupDescribe Api [kafka]

2024-04-23 Thread via GitHub


riedelmax commented on code in PR #15727:
URL: https://github.com/apache/kafka/pull/15727#discussion_r1576821065


##
core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala:
##
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import kafka.server.GroupCoordinatorBaseRequestTest
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.TestUtils
+import org.apache.kafka.common.ConsumerGroupState
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData
+import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, 
DescribedGroup, TopicPartitions}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{Tag, Timeout}
+
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)

Review Comment:
   Hey @johnnychhsu 
   Thanks for reviewing :)
   
   AFAIK the new consumer group only works with KRAFT clusters. 
   @dajac can you confirm this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15782:
URL: https://github.com/apache/kafka/pull/15782#issuecomment-2073234352

   re-trigger QA since some builds get terminated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576751637


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.consumer.group;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+public class ConsumerGroupExecutor implements AutoCloseable {
+final int numThreads;
+final ExecutorService executor;
+final List consumers = new ArrayList<>();
+
+public ConsumerGroupExecutor(
+String broker,
+int numConsumers,
+String groupId,
+String groupProtocol,
+String topic,
+String strategy,
+Optional remoteAssignor,
+Optional customPropsOpt,
+boolean syncCommit

Review Comment:
   The `syncCommit` param seems to always be false. If that's the case what's 
the purpose of it? (is it with the intention of using this same common 
`ConsumerGroupExecutor` class from the `ConsumerGroupCommandTest` too?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576738178


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.consumer.group;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+public class ConsumerGroupExecutor implements AutoCloseable {
+final int numThreads;
+final ExecutorService executor;
+final List consumers = new ArrayList<>();
+
+public ConsumerGroupExecutor(
+String broker,
+int numConsumers,
+String groupId,
+String groupProtocol,
+String topic,
+String strategy,

Review Comment:
   maybe rename this to `assignmentStrategy` for clarity on what it is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576735920


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.consumer.group;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+public class ConsumerGroupExecutor implements AutoCloseable {
+final int numThreads;
+final ExecutorService executor;
+final List consumers = new ArrayList<>();
+
+public ConsumerGroupExecutor(
+String broker,
+int numConsumers,
+String groupId,
+String groupProtocol,
+String topic,
+String strategy,
+Optional remoteAssignor,
+Optional customPropsOpt,
+boolean syncCommit

Review Comment:
   We have 2 well defined group types, classic and consumer, with different 
creation params, and having them all together seems a bit confusing (ex. having 
assignment strategy along with remote assignor)...not very intuitive when 
creating a new instance (that I expect several other test files will do as they 
get migrated). So wonder if it would maybe be clearer to split them into static 
builders that would take only the params they need? like:
   ```
   public static ConsumerGroupExecutor forConsumerGroup(
String broker,
int numConsumers,
String groupId,
String topic,
Optional 
remoteAssignor,

Optional customPropsOpt,
boolean syncCommit) 
{
   
   }
   
   public static ConsumerGroupExecutor forClassicGroup(
   String broker,
   int numConsumers,
   String groupId,
   String topic,
   String strategy,
   Optional 
customPropsOpt,
   boolean syncCommit) {
   
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close

2024-04-23 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840191#comment-17840191
 ] 

Kirk True commented on KAFKA-16217:
---

[~calvinliu]—I noticed the PR request 15541 is merged, but the status of this 
still "In Progress." The bug lists the fix versions as 3.8.0, 3.7.1, and 3.6.3. 
Was the change in PR 15541 back-ported to those earlier versions?

Thanks!

> Transactional producer stuck in IllegalStateException during close
> --
>
> Key: KAFKA-16217
> URL: https://issues.apache.org/jira/browse/KAFKA-16217
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Calvin Liu
>Assignee: Calvin Liu
>Priority: Major
>  Labels: transactions
> Fix For: 3.8.0, 3.7.1, 3.6.3
>
>
> The producer is stuck during the close. It keeps retrying to abort the 
> transaction but it never succeeds. 
> {code:java}
> [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | 
> producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] 
> org.apache.kafka.clients.producer.internals.Sender run - [Producer 
> clientId=producer-transaction-ben
> ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, 
> transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] 
> Error in kafka producer I/O thread while aborting transaction:
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) 
> {code}
> With the additional log, I found the root cause. If the producer is in a bad 
> transaction state(in my case, the TransactionManager.pendingTransition was 
> set to commitTransaction and did not get cleaned), then the producer calls 
> close and tries to abort the existing transaction, the producer will get 
> stuck in the transaction abortion. It is related to the fix 
> [https://github.com/apache/kafka/pull/13591].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16462) New consumer fails with timeout in security_test.py system test

2024-04-23 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16462.
---
Resolution: Duplicate

> New consumer fails with timeout in security_test.py system test
> ---
>
> Key: KAFKA-16462
> URL: https://issues.apache.org/jira/browse/KAFKA-16462
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{security_test.py}} system test fails with the following error:
> {noformat}
> test_id:
> kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   1 minute 30.885 seconds
> TimeoutError('')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/security_test.py",
>  line 142, in test_client_ssl_endpoint_validation_failure
> wait_until(lambda: self.producer_consumer_have_expected_error(error), 
> timeout_sec=30)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16461) New consumer fails to consume records in security_test.py system test

2024-04-23 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16461:
--
Description: 
The {{security_test.py}} system test fails with the following error:

{quote}
* Consumer failed to consume up to offsets
{quote}

Affected test:

* {{test_client_ssl_endpoint_validation_failure}}

Cause:

The system test was failing because the {{VerifiableConsumer}} hit a 
{{NullPointerException}} during startup. The reason for the NPE was an attempt 
to put a {{null}} as the value of {{--group-remote-assignor}} in the 
{{Consumer}}'s configuration.



  was:
The {{security_test.py}} system test fails with the following error:

{quote}
* Consumer failed to consume up to offsets
{quote}

Affected test:

* {{test_client_ssl_endpoint_validation_failure}}


> New consumer fails to consume records in security_test.py system test
> -
>
> Key: KAFKA-16461
> URL: https://issues.apache.org/jira/browse/KAFKA-16461
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{security_test.py}} system test fails with the following error:
> {quote}
> * Consumer failed to consume up to offsets
> {quote}
> Affected test:
> * {{test_client_ssl_endpoint_validation_failure}}
> Cause:
> The system test was failing because the {{VerifiableConsumer}} hit a 
> {{NullPointerException}} during startup. The reason for the NPE was an 
> attempt to put a {{null}} as the value of {{--group-remote-assignor}} in the 
> {{Consumer}}'s configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16464) New consumer fails with timeout in replication_replica_failure_test.py system test

2024-04-23 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16464.
---
Resolution: Duplicate

> New consumer fails with timeout in replication_replica_failure_test.py system 
> test
> --
>
> Key: KAFKA-16464
> URL: https://issues.apache.org/jira/browse/KAFKA-16464
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{replication_replica_failure_test.py}} system test fails with the 
> following error:
> {noformat}
> test_id:
> kafkatest.tests.core.replication_replica_failure_test.ReplicationReplicaFailureTest.test_replication_with_replica_failure.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   1 minute 20.972 seconds
> TimeoutError('Timed out after 30s while awaiting initial record delivery 
> of 5 records')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/replication_replica_failure_test.py",
>  line 97, in test_replication_with_replica_failure
> self.await_startup()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/end_to_end.py",
>  line 125, in await_startup
> (timeout_sec, min_records))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Timed out after 30s while awaiting initial 
> record delivery of 5 records
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576631689


##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -59,87 +63,66 @@ public Builder setCombined(boolean combined) {
 }
 
 public Builder setNumControllerNodes(int numControllerNodes) {
-if (numControllerNodes < 0) {
-throw new RuntimeException("Invalid negative value for 
numControllerNodes");
-}
-
-while (controllerNodeBuilders.size() > numControllerNodes) {
-controllerNodeBuilders.pollFirstEntry();
-}
-while (controllerNodeBuilders.size() < numControllerNodes) {
-int nextId = startControllerId();
-if (!controllerNodeBuilders.isEmpty()) {
-nextId = controllerNodeBuilders.lastKey() + 1;
-}
-controllerNodeBuilders.put(nextId,
-new ControllerNode.Builder().
-setId(nextId));
-}
+this.numControllerNodes = numControllerNodes;
 return this;
 }
 
 public Builder setNumBrokerNodes(int numBrokerNodes) {
-return setBrokerNodes(numBrokerNodes, 1);
+this.numBrokerNodes = numBrokerNodes;
+return this;
+}
+
+public Builder setNumDisksPerBroker(int numDisksPerBroker) {
+this.numDisksPerBroker = numDisksPerBroker;
+return this;
+}
+
+public Builder setPerBrokerProperties(Map> perBrokerProperties) {
+this.perBrokerProperties = Collections.unmodifiableMap(
+perBrokerProperties.entrySet().stream()
+.collect(Collectors.toMap(Map.Entry::getKey, e -> 
Collections.unmodifiableMap(new HashMap<>(e.getValue());
+return this;
 }
 
-public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) {
+public TestKitNodes build() {
+if (numControllerNodes < 0) {
+throw new RuntimeException("Invalid negative value for 
numControllerNodes");
+}
 if (numBrokerNodes < 0) {
 throw new RuntimeException("Invalid negative value for 
numBrokerNodes");
 }
-if (disksPerBroker <= 0) {
-throw new RuntimeException("Invalid value for disksPerBroker");
-}
-while (brokerNodeBuilders.size() > numBrokerNodes) {
-brokerNodeBuilders.pollFirstEntry();
-}
-while (brokerNodeBuilders.size() < numBrokerNodes) {
-int nextId = startBrokerId();
-if (!brokerNodeBuilders.isEmpty()) {
-nextId = brokerNodeBuilders.lastKey() + 1;
-}
-BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
-.setId(nextId)
-.setNumLogDirectories(disksPerBroker);
-brokerNodeBuilders.put(nextId, brokerNodeBuilder);
+if (numDisksPerBroker <= 0) {
+throw new RuntimeException("Invalid value for 
numDisksPerBroker");
 }
-return this;
-}
 
-public TestKitNodes build() {
 String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
-try {
-if (clusterId == null) {
-clusterId = Uuid.randomUuid();
-}
-TreeMap controllerNodes = new 
TreeMap<>();
-for (ControllerNode.Builder builder : 
controllerNodeBuilders.values()) {
-ControllerNode node = builder.
-build(baseDirectory, clusterId, 
brokerNodeBuilders.containsKey(builder.id()));
-if (controllerNodes.put(node.id(), node) != null) {
-throw new RuntimeException("Duplicate builder for 
controller " + node.id());
-}
-}
-TreeMap brokerNodes = new TreeMap<>();
-for (BrokerNode.Builder builder : brokerNodeBuilders.values()) 
{
-BrokerNode node = builder.
-build(baseDirectory, clusterId, 
controllerNodeBuilders.containsKey(builder.id()));
-if (brokerNodes.put(node.id(), node) != null) {
-throw new RuntimeException("Duplicate builder for 
broker " + node.id());
-}
-}
-return new TestKitNodes(baseDirectory,
-clusterId,
-bootstrapMetadata,
-controllerNodes,
-brokerNodes);
-} catch (Exception e) {
-try {
-Files.delete(Paths.get(baseDirectory));
-} catch (Exception x) {
-throw new RuntimeException("Failed 

[jira] [Commented] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs

2024-04-23 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840163#comment-17840163
 ] 

Sagar Rao commented on KAFKA-16604:
---

[~chia7712] , ok i have assigned it to myself.

> Deprecate ConfigDef.ConfigKey constructor from public APIs
> --
>
> Key: KAFKA-16604
> URL: https://issues.apache.org/jira/browse/KAFKA-16604
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> Currently, one can create ConfigKey by either invoking the public constructor 
> directly and passing it to a ConfigDef object or by invoking the a bunch of 
> define methods. The 2 ways can get confusing at times. Moreover, it could 
> lead to errors as was noticed in KAFKA-16592
> We should ideally have only 1 way exposed to the users which IMO should be to 
> create the objects only through the exposed define methods. This ticket is 
> about marking the public constructor of ConfigKey as Deprecated first and 
> then making it private eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs

2024-04-23 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-16604:
-

Assignee: Sagar Rao

> Deprecate ConfigDef.ConfigKey constructor from public APIs
> --
>
> Key: KAFKA-16604
> URL: https://issues.apache.org/jira/browse/KAFKA-16604
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> Currently, one can create ConfigKey by either invoking the public constructor 
> directly and passing it to a ConfigDef object or by invoking the a bunch of 
> define methods. The 2 ways can get confusing at times. Moreover, it could 
> lead to errors as was noticed in KAFKA-16592
> We should ideally have only 1 way exposed to the users which IMO should be to 
> create the objects only through the exposed define methods. This ticket is 
> about marking the public constructor of ConfigKey as Deprecated first and 
> then making it private eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16607) Update the KIP and metrics implementation to include the new state

2024-04-23 Thread Jira
José Armando García Sancio created KAFKA-16607:
--

 Summary: Update the KIP and metrics implementation to include the 
new state
 Key: KAFKA-16607
 URL: https://issues.apache.org/jira/browse/KAFKA-16607
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft
Reporter: José Armando García Sancio


KafkaRaftMetrics exposes a current-state metrics that needs to be updated to 
include the prospective state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


frankvicky commented on PR #15766:
URL: https://github.com/apache/kafka/pull/15766#issuecomment-2072873129

   > @frankvicky @m1a2st It seems your PR (#15766 and #15779) need a consumer 
running in background. Hence, we can consider moving 
`AbstractConsumerGroupExecutor`/`ConsumerGroupExecutor` to a individual java 
file. WDYT?
   
   It sounds good. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


lianetm commented on PR #15766:
URL: https://github.com/apache/kafka/pull/15766#issuecomment-2072864210

   @chia7712 's comment makes sense to me, and heads-up, similar classes are 
already defined in 
[ConsumerGroupCommandTest](https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java#L315),
 so probably sensible to take a look at all and consider consolidating


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Current working branch for KIP-848 kip [kafka]

2024-04-23 Thread via GitHub


lucasbru opened a new pull request, #15789:
URL: https://github.com/apache/kafka/pull/15789

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15788:
URL: https://github.com/apache/kafka/pull/15788#issuecomment-2072851173

   @johnnychhsu Instead of removing them, could you make `MetadataLogConfig` 
use those help methods? 
   
   
https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/core/src/main/scala/kafka/MetadataLogConfig.scala#L38


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840150#comment-17840150
 ] 

Mickael Maison commented on KAFKA-16606:


To me that kind of looks like a bug. Previously JBOD was not supported with 
KRaft so it was a big deal that it did not quite work. As we're approaching 
JBOD being production ready, we should ease the rough edges and avoid allowing 
weird unsupported configurations. 

As JBOD is not production ready, would we really break anyone's cluster if we 
prevented using multiple log dirs with KRaft and the version set as < 3.7?  I 
understand some people may disagree but I wonder if it's a discussion worth 
having. 

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> 

Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576531833


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -84,17 +125,38 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
 assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion())
   }
 
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "false"),
+  @ClusterTests(Array(

Review Comment:
   Already added the TODO in the top of this file. L30



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15766:
URL: https://github.com/apache/kafka/pull/15766#issuecomment-2072841232

   @frankvicky @m1a2st It seems your PR (#15766 and #15779) need a consumer 
running in background. Hence, we can consider moving 
`AbstractConsumerGroupExecutor`/`ConsumerGroupExecutor` to a individual java 
file. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576523223


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,440 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
 
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+})
+public class DeleteConsumerGroupsTest {
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+private static final String MISSING_GROUP = "missing.group";
+
+private final ClusterInstance cluster;
+private final Iterable groupProtocols;
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+this.groupProtocols = cluster.isKRaftTest()
+? Arrays.asList(CLASSIC, CONSUMER)
+: 

[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840148#comment-17840148
 ] 

Igor Soarez commented on KAFKA-16606:
-

That's right [~scholzj] . When KIP-858 started, the documentation was already 
explicit about JBOD support being one of the missing features in KRaft, but the 
configuration was already allowed, so it didn't seem right to change it at the 
time, but in hindsight maybe that would've been a better decision.

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> 

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-23 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1576516058


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new 

Re: [PR] KAFKA-15838: ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields [kafka]

2024-04-23 Thread via GitHub


mfvitale commented on PR #15756:
URL: https://github.com/apache/kafka/pull/15756#issuecomment-2072820368

   > The `configure()` method on Transformation is only called with 
configurations provided with their prefix.
   
   This cleared my doubt. So in that case it should be clear declared as an SMT 
configuration. I'll open a KIP. Thanks for the explanation. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576508031


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -42,109 +58,141 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
-String[] getArgs(String group, String topic) {
-return new String[] {
-"--bootstrap-server", bootstrapServers(listenerName()),
-"--delete-offsets",
-"--group", group,
-"--topic", topic
-};
+@Tag("integration")
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+})
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+public static final String TOPIC = "foo";
+public static final String GROUP = "test.group";
+private final ClusterInstance clusterInstance;
+
+private ConsumerGroupCommand.ConsumerGroupService consumerGroupService;
+private final Iterable> consumerConfigs;
+
+DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) {
+this.clusterInstance = clusterInstance;
+this.consumerConfigs = clusterInstance.isKRaftTest()
+? Arrays.asList(
+new HashMap() {{

Review Comment:
   We can use immutable map and make `createConsumer` create a inner mutable 
map to collect all configs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16475: add test for TopicImageNodeTest [kafka]

2024-04-23 Thread via GitHub


johnnychhsu commented on PR #15720:
URL: https://github.com/apache/kafka/pull/15720#issuecomment-2072812226

   thanks for the review @cmccabe ! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-23 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1576507146


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new 

Re: [PR] KAFKA-14509: [3/4] Add integration test for consumerGroupDescribe Api [kafka]

2024-04-23 Thread via GitHub


johnnychhsu commented on code in PR #15727:
URL: https://github.com/apache/kafka/pull/15727#discussion_r1576500340


##
core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala:
##
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import kafka.server.GroupCoordinatorBaseRequestTest
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.TestUtils
+import org.apache.kafka.common.ConsumerGroupState
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData
+import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, 
DescribedGroup, TopicPartitions}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{Tag, Timeout}
+
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)

Review Comment:
   may i know if we also need to include zk mode for this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576492165


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -66,17 +69,30 @@ public Builder setNumLogDirectories(int numLogDirectories) {
 return this;
 }
 
-public BrokerNode build(
-String baseDirectory,
-Uuid clusterId,
-boolean combined
-) {
+public Builder setClusterId(Uuid clusterId) {
+this.clusterId = clusterId;
+return this;
+}
+
+public Builder setBaseDirectory(String baseDirectory) {
+this.baseDirectory = baseDirectory;
+return this;
+}
+
+public Builder setCombined(boolean combined) {
+this.combined = combined;
+return this;
+}
+
+public Builder setPropertyOverrides(Map 
propertyOverrides) {
+this.propertyOverrides = Collections.unmodifiableMap(new 
HashMap<>(propertyOverrides));
+return this;
+}
+
+public BrokerNode build() {
 if (id == -1) {
 throw new RuntimeException("You must set the node id.");
 }
-if (incarnationId == null) {
-incarnationId = Uuid.randomUuid();
-}
 List logDataDirectories = IntStream

Review Comment:
   Could you add null check for `baseDirectory`?



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) {
 }
 
 public Builder setNumControllerNodes(int numControllerNodes) {
-if (numControllerNodes < 0) {
-throw new RuntimeException("Invalid negative value for 
numControllerNodes");
-}
-
-while (controllerNodeBuilders.size() > numControllerNodes) {
-controllerNodeBuilders.pollFirstEntry();
-}
-while (controllerNodeBuilders.size() < numControllerNodes) {
-int nextId = startControllerId();
-if (!controllerNodeBuilders.isEmpty()) {
-nextId = controllerNodeBuilders.lastKey() + 1;
-}
-controllerNodeBuilders.put(nextId,
-new ControllerNode.Builder().
-setId(nextId));
-}
+this.numControllerNodes = numControllerNodes;
 return this;
 }
 
 public Builder setNumBrokerNodes(int numBrokerNodes) {
-return setBrokerNodes(numBrokerNodes, 1);
+this.numBrokerNodes = numBrokerNodes;
+return this;
+}
+
+public Builder setNumDisksPerBroker(int numDisksPerBroker) {
+this.numDisksPerBroker = numDisksPerBroker;
+return this;
+}
+
+public Builder setPerBrokerProperties(Map> perBrokerProperties) {
+this.perBrokerProperties = Collections.unmodifiableMap(
+perBrokerProperties.entrySet().stream()
+.collect(Collectors.toMap(Map.Entry::getKey, e -> 
Collections.unmodifiableMap(new HashMap<>(e.getValue());
+return this;
 }
 
-public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) {
+public TestKitNodes build() {
+if (numControllerNodes < 0) {
+throw new RuntimeException("Invalid negative value for 
numControllerNodes");
+}
 if (numBrokerNodes < 0) {
 throw new RuntimeException("Invalid negative value for 
numBrokerNodes");
 }
-if (disksPerBroker <= 0) {
-throw new RuntimeException("Invalid value for disksPerBroker");
-}
-while (brokerNodeBuilders.size() > numBrokerNodes) {
-brokerNodeBuilders.pollFirstEntry();
+if (numDisksPerBroker <= 0) {
+throw new RuntimeException("Invalid value for 
numDisksPerBroker");
 }
-while (brokerNodeBuilders.size() < numBrokerNodes) {
-int nextId = startBrokerId();
-if (!brokerNodeBuilders.isEmpty()) {
-nextId = brokerNodeBuilders.lastKey() + 1;
-}
-BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
-.setId(nextId)
-.setNumLogDirectories(disksPerBroker);
-brokerNodeBuilders.put(nextId, brokerNodeBuilder);
-}
-return this;
-}
 
-public TestKitNodes build() {
 String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();

Review Comment:
   We don't need to delete `baseDirectory` since `TestUtils.tempDirectory()` 
will delete the return folder when terminating.



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -167,11 

Re: [PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]

2024-04-23 Thread via GitHub


johnnychhsu commented on PR #15787:
URL: https://github.com/apache/kafka/pull/15787#issuecomment-2072775161

   nice fix! crystal clear solution :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Jakub Scholz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jakub Scholz resolved KAFKA-16606.
--
Resolution: Not A Problem

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> 

Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576457731


##
core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala:
##
@@ -29,9 +29,9 @@ import scala.jdk.CollectionConverters._
  */
 class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: 
Logger) {
   import LinuxIoMetricsCollector._
-  var lastUpdateMs: Long = -1L
-  var cachedReadBytes:Long = 0L
-  var cachedWriteBytes:Long = 0L
+  private var lastUpdateMs: Long = -1L

Review Comment:
   It seems the type declaration is unnecessary since its literal ends with `L`



##
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala:
##
@@ -347,7 +347,7 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
 if (isKRaftTest()) {
   val result = new util.HashMap[Uuid, String]()
   
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach
 {

Review Comment:
   how about 
`controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().forEach((k,
 v) => result.put(v, k))`



##
core/src/main/scala/kafka/server/SharedServer.scala:
##
@@ -107,16 +107,16 @@ class SharedServer(
   @volatile var brokerMetrics: BrokerServerMetrics = _
   @volatile var controllerServerMetrics: ControllerMetadataMetrics = _
   @volatile var loader: MetadataLoader = _
-  val snapshotsDisabledReason = new AtomicReference[String](null)
+  private val snapshotsDisabledReason = new AtomicReference[String](null)
   @volatile var snapshotEmitter: SnapshotEmitter = _
-  @volatile var snapshotGenerator: SnapshotGenerator = _
-  @volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _
+  @volatile private var snapshotGenerator: SnapshotGenerator = _
+  @volatile private var metadataLoaderMetrics: MetadataLoaderMetrics = _
 
   def clusterId: String = metaPropsEnsemble.clusterId().get()
 
-  def nodeId: Int = metaPropsEnsemble.nodeId().getAsInt()
+  def nodeId: Int = metaPropsEnsemble.nodeId().getAsInt
 
-  def isUsed(): Boolean = synchronized {
+  private def isUsed(): Boolean = synchronized {

Review Comment:
   How about `isUsed`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


frankvicky commented on PR #15766:
URL: https://github.com/apache/kafka/pull/15766#issuecomment-2072734321

   Hi @lianetm, @chia7712 
   Thanks for the suggestions, I have addressed the comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-23 Thread via GitHub


johnnychhsu opened a new pull request, #15788:
URL: https://github.com/apache/kafka/pull/15788

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15838: ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields [kafka]

2024-04-23 Thread via GitHub


mimaison commented on PR #15756:
URL: https://github.com/apache/kafka/pull/15756#issuecomment-2072692296

   While it's not directly adding new configurations, it's effectively changing 
the behavior of `ExtractField` and `InsertField` and making them support new 
configurations. The `configure()` method on Transformation is only called with 
configurations provided with their prefix. In this example a user would need to 
set something like:
   
   ```
   transforms=my-smt
   transforms.my-smt.type=MyTransformation
   transforms.my-smt.key.converter.replace.null.with.default=false
   ```
   
   Also by not defining the configurations explicitly users can't discover them 
using the `GET /connector-plugins/{plugin-type}/config`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Jakub Scholz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840132#comment-17840132
 ] 

Jakub Scholz commented on KAFKA-16606:
--

[~soarez] That sounds like without using the 3.7-IV2 metadata JBOD in Kraft 
works fine until it fails and then it might cause problems. So I wonder if it 
would have been better to protect the users from that situation.

But it sounds like it was an intentional decision. And as I said, changing it 
now might be anyway a bit tricky as it might break someone's cluster after 
upgrading to 3.7.1 / 3.8.0. So I guess it sounds good as it is and I will close 
this. Thanks for the explanation.

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> 

Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576432963


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -121,16 +146,16 @@ public BrokerNode build(
 private final boolean combined;
 private final Map propertyOverrides;
 
-BrokerNode(
+private BrokerNode(
 Uuid incarnationId,
 MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
 boolean combined,
 Map propertyOverrides
 ) {
-this.incarnationId = incarnationId;
-this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble;

Review Comment:
   I make `logDataDirectories()` return Set instead List to get 
rid of the redundant arraylist wrapping.



##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -121,16 +146,16 @@ public BrokerNode build(
 private final boolean combined;
 private final Map propertyOverrides;
 
-BrokerNode(
+private BrokerNode(
 Uuid incarnationId,
 MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
 boolean combined,
 Map propertyOverrides
 ) {
-this.incarnationId = incarnationId;
-this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble;

Review Comment:
   I make `logDataDirectories()` return `Set` instead `List` to 
get rid of the redundant arraylist wrapping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576432135


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -202,7 +256,7 @@ private KafkaProducer 
createProducer(Properties config) {
 }
 
 private Consumer createConsumer(Properties config) {

Review Comment:
   Updated it and also added empty map for zk, or zk will not be tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


mimaison commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576422765


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -959,7 +960,7 @@ class ZkAdminManager(val config: KafkaConfig,
 } else if (requestStatus.mechanism == Some(ScramMechanism.UNKNOWN)) {
   (requestStatus.user, unknownScramMechanismMsg)

Review Comment:
   It looks weird to me but it seems `contains()` is Scala idiomatic, so I made 
the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


mimaison commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576420731


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -144,7 +144,7 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =

Review Comment:
   You're right, this is unused -> removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576409478


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
  

[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840125#comment-17840125
 ] 

Igor Soarez commented on KAFKA-16606:
-

Thanks for bringing this to my attention [~mimaison].

Hi [~scholzj] , thanks for pointing this out. I think there's some confusion 
here with a JBOD configuration being +allowed+ vs being +supported+ in KRaft.

In terms of just reading and writing data to multiple log directories, as long 
as those direcories are always available, there's nothing special about KRaft 
that would require changes compared with ZK mode. What is enabled with 3.7 is 
the handling of failed log directories. You'll find that partitions don't get 
new leaders elected – becoming indefinitely unavailable – if the log directory 
for the leader replica fails but the broker stays alive.

If a single directory is configured and it becomes unavailable the broker shuts 
down, as there is no point in continuing to run without access to storage. When 
it shuts down the controller becomes aware of that – via an ephemeral znode in 
ZK mode, or via missing hearbeats in KRaft – and it will re-elect leaders for 
partitions that were led by the broker. When multiple directories are 
configured, it is critical to have a separate mechanism to let the controller 
know there is a partial failure – the broker is still alive and operational on 
the remaining log dirs, but any partitions on the directory that failed need a 
leadership and ISR update.

In ZK mode that was handled by notifying the controller via a znode, so a an 
alternative solution was required for KRaft. You can find the details in 
[KIP-858|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft].

Let me know if that makes sense.

 

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> 

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576385492


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
  

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576392526


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
 

Re: [PR] KAFKA-15931: Reopen TransactionIndex if channel is closed [kafka]

2024-04-23 Thread via GitHub


nikramakrishnan commented on PR #15241:
URL: https://github.com/apache/kafka/pull/15241#issuecomment-2072530250

   Bump! @satishd @kamalcph can we get this review going? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576386813


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -202,7 +256,7 @@ private KafkaProducer 
createProducer(Properties config) {
 }
 
 private Consumer createConsumer(Properties config) {

Review Comment:
   We can change the type from `Properties` to `Map`. With that 
change we don't need to create a lot of `Properties` in each test case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576385492


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
  

Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576375031


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -2192,8 +2192,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   @ValueSource(strings = Array("zk", "kraft"))
   def testLongTopicNames(quorum: String): Unit = {
 val client = Admin.create(createConfig)
-val longTopicName = String.join("", Collections.nCopies(249, "x"));
-val invalidTopicName = String.join("", Collections.nCopies(250, "x"));
+val longTopicName = String.join("", Collections.nCopies(249, "x"))

Review Comment:
   Small scala suggestion here (which feel free to ignore) we can use 
`List.fill(249)("x").mkString("")` instead of Java `String.join` and 
`Collections.nCopies`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-23 Thread via GitHub


KevinZTW commented on code in PR #15714:
URL: https://github.com/apache/kafka/pull/15714#discussion_r1576372796


##
storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class PartitionMetadataFileTest  {
+private final File dir = TestUtils.tempDirectory();
+private final File file = PartitionMetadataFile.newFile(dir);
+
+@Test
+public void testSetRecordWithDifferentTopicId() {
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+partitionMetadataFile.record(topicId);
+Uuid differentTopicId = Uuid.randomUuid();
+assertThrows(InconsistentTopicIdException.class, () -> 
partitionMetadataFile.record(differentTopicId));
+}
+
+@Test
+public void testSetRecordWithSameTopicId() {
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+partitionMetadataFile.record(topicId);
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));

Review Comment:
   sorry I missed this one just update the PR as suggested!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576375031


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -2192,8 +2192,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   @ValueSource(strings = Array("zk", "kraft"))
   def testLongTopicNames(quorum: String): Unit = {
 val client = Admin.create(createConfig)
-val longTopicName = String.join("", Collections.nCopies(249, "x"));
-val invalidTopicName = String.join("", Collections.nCopies(250, "x"));
+val longTopicName = String.join("", Collections.nCopies(249, "x"))

Review Comment:
   Small scala suggestion here (which you feel free to ignore) we can use 
`List.fill(249)("x").mkString("")` instead of Java `String.join` and 
`Collections.nCopies`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840118#comment-17840118
 ] 

Mickael Maison commented on KAFKA-16606:


cc [~soarez] 

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> 

Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576366593


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig,
 users.get.filterNot(usersToSkip.contains).foreach { user =>
   try {
 val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, 
Sanitizer.sanitize(user))
-addToResultsIfHasScramCredential(user, userConfigs, true)
+addToResultsIfHasScramCredential(user, userConfigs, explicitUser = 
true)
   } catch {
 case e: Exception => {

Review Comment:
   make sense don't worry about it. We are moving more and more from scala to 
java anyway so these will get resolved over time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


mimaison commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576363327


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig,
 users.get.filterNot(usersToSkip.contains).foreach { user =>
   try {
 val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, 
Sanitizer.sanitize(user))
-addToResultsIfHasScramCredential(user, userConfigs, true)
+addToResultsIfHasScramCredential(user, userConfigs, explicitUser = 
true)
   } catch {
 case e: Exception => {

Review Comment:
   That's just my guesstimate, but I expect it to be large. I kind of focused 
on the low hanging fruits here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576360092


##
core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala:
##
@@ -52,8 +51,8 @@ object ZooKeeperClient {
  * @param sessionTimeoutMs session timeout in milliseconds
  * @param connectionTimeoutMs connection timeout in milliseconds
  * @param maxInFlightRequests maximum number of unacknowledged requests the 
client will send before blocking.
+ * @param clientConfig ZooKeeper client configuration, for TLS configs if 
desired

Review Comment:
   Nice, I believe we might need to update the have java doc in 
`registerStateChangeHandler` and `unregisterStateChangeHandler` in same file as 
well 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


mimaison commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576358398


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -300,7 +300,7 @@ class ReplicaManager(val config: KafkaConfig,
   protected val allPartitions = new Pool[TopicPartition, HostedPartition](

Review Comment:
   Ideally we should declare types for all public and protected fields but this 
is a huge change.
   
   Also while it's useful in some cases, in many I find it's not adding much 
value. In this specific example I even find it annoying as you get:
   ```
   protected val allPartitions: Pool[TopicPartition, HostedPartition] = new 
Pool[TopicPartition, HostedPartition](
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


mimaison commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576351984


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig,
 users.get.filterNot(usersToSkip.contains).foreach { user =>
   try {
 val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, 
Sanitizer.sanitize(user))
-addToResultsIfHasScramCredential(user, userConfigs, true)
+addToResultsIfHasScramCredential(user, userConfigs, explicitUser = 
true)
   } catch {
 case e: Exception => {

Review Comment:
   Yeah in Scala braces are not required around multi-line blocks. I've not 
made this change because braces are required in Java and we have the braces in 
Scala all over the code base. Changing this is probably a >500 line diff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on PR #15786:
URL: https://github.com/apache/kafka/pull/15786#issuecomment-2072445566

   We also have couple of out-of-date parameters in javaDocs, we can either fix 
here or have another followup pr
   - 
https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala#L150
 
   - 
https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala#L270


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576354443


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig,
 users.get.filterNot(usersToSkip.contains).foreach { user =>
   try {
 val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, 
Sanitizer.sanitize(user))
-addToResultsIfHasScramCredential(user, userConfigs, true)
+addToResultsIfHasScramCredential(user, userConfigs, explicitUser = 
true)
   } catch {
 case e: Exception => {

Review Comment:
   ough >500 is too much. Well they will get cleanup as we move more scala to 
java :D 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Jakub Scholz (Jira)
Jakub Scholz created KAFKA-16606:


 Summary: JBOD support in KRaft does not seem to be gated by the 
metadata version
 Key: KAFKA-16606
 URL: https://issues.apache.org/jira/browse/KAFKA-16606
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Jakub Scholz


JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka [source 
code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
 suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
However, it seems to be possible to run KRaft cluster with JBOD even with older 
metadata versions such as {{{}3.6{}}}. For example, I have a cluster using the 
{{3.6}} metadata version:
{code:java}
bin/kafka-features.sh --bootstrap-server localhost:9092 describe
Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
{code}
Yet a KRaft cluster with JBOD seems to run fine:
{code:java}
bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
Querying brokers for log directories information
Received log directory information from brokers 2000,3000,1000

  1   2   >