[jira] [Created] (KAFKA-16885) Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to RemoteLogManagerConfig#isRemoteStorageSystemEnabled

2024-06-03 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16885:
--

 Summary: Consider renaming 
RemoteLogManagerConfig#enableRemoteStorageSystem to 
RemoteLogManagerConfig#isRemoteStorageSystemEnabled
 Key: KAFKA-16885
 URL: https://issues.apache.org/jira/browse/KAFKA-16885
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


see the discussion: 
https://github.com/apache/kafka/pull/16153#issuecomment-2144269279



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16859: Cleanup check if tiered storage is enabled [kafka]

2024-06-03 Thread via GitHub


chia7712 commented on PR #16153:
URL: https://github.com/apache/kafka/pull/16153#issuecomment-2146674263

   I file the ticket https://issues.apache.org/jira/browse/KAFKA-16885. We can 
have more discussion on there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Update spotless version and remove support JDK8 [kafka]

2024-06-03 Thread via GitHub


chia7712 commented on code in PR #16176:
URL: https://github.com/apache/kafka/pull/16176#discussion_r1625383681


##
build.gradle:
##
@@ -47,9 +47,11 @@ plugins {
   // Updating the shadow plugin version to 8.1.1 causes issue with signing and 
publishing the shadowed
   // artifacts - see https://github.com/johnrengelman/shadow/issues/901
   id 'com.github.johnrengelman.shadow' version '8.1.0' apply false
-  //  the minimum required JRE of 6.14.0+ is 11
+  //  the spotless before 6.14.0 support JDK8, after 6.23.3 support JDK21

Review Comment:
   ```
   Spotless 6.13.0 has issue with Java 21 (see 
https://github.com/diffplug/spotless/pull/1920), and Spotless 6.14.0+ requires 
JRE 11
   
   We are going to drop JDK8 support. Hence, the spotless is upgrade to newest 
version and be applied only if the build env is compatible with JDK 11.
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (KAFKA-4094) Fix importance labels for Kafka Server config

2024-06-03 Thread Abhi (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-4094 ]


Abhi deleted comment on KAFKA-4094:
-

was (Author: JIRAUSER305362):
Hi [~jkreps] 
Can you describe this issue more and also mention which component it belongs to?

As per my understanding, you need the server configs like:- 

HIGH - broker.id, log.dirs, num.partitions

MEDIUM - log.segment.bytes, log.cleanup.policy

LOW -  message.max.bytes, replica.fetch.max.bytes

> Fix importance labels for Kafka Server config
> -
>
> Key: KAFKA-4094
> URL: https://issues.apache.org/jira/browse/KAFKA-4094
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Priority: Major
>  Labels: newbie
>
> We have > 100 server configs. The importance label is meant to help people 
> navigate this in a sane way. The intention is something like the following:
> HIGH - things you must think about and set
> MEDIUM - things you don't necessarily need to set but that you might want to 
> tune
> LOW - thing you probably don't need to set
> Currently we have a gazillion things marked as high including very subtle 
> tuning params and also things marked as deprecated (which probably should be 
> its own importance level). This makes it really hard for people to figure out 
> which configurations to actually learn about and use.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-06-03 Thread via GitHub


rreddy-22 commented on PR #16068:
URL: https://github.com/apache/kafka/pull/16068#issuecomment-2146608185

   **Trunk**
   ```
   Benchmark   (memberCount)  
(partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
   TargetAssignmentBuilderBenchmark.build  1
 10   100  avgt5  4.292 ± 0.132  ms/op
   TargetAssignmentBuilderBenchmark.build  1
 10  1000  avgt5  5.477 ± 1.292  ms/op
   ```
   **Patch**
   ```
   Benchmark   (memberCount)  
(partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
   TargetAssignmentBuilderBenchmark.build  1
 10   100  avgt5  4.701 ± 0.257  ms/op
   TargetAssignmentBuilderBenchmark.build  1
 10  1000  avgt5  5.492 ± 0.269  ms/op
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-06-03 Thread via GitHub


rreddy-22 commented on PR #16068:
URL: https://github.com/apache/kafka/pull/16068#issuecomment-2146607344

   Trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ReassignPartitionsIntegrationTest to use ClusterTestExtensions [kafka]

2024-06-03 Thread via GitHub


FrankYang0529 commented on PR #15675:
URL: https://github.com/apache/kafka/pull/15675#issuecomment-2146578561

   @chia7712 I address all comments. Thanks for the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1625304008


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   One drawback of the current approach is that the thread-pool will be shown 
as all "busy" in the metrics when the threads are waiting for the quota to be 
available. It can raise false alarms if user configures an 100% thread-pool 
usage alert. We can mention about the behavior in the docs section.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1625304008


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   One drawback of the current approach is that the thread-pool will be shown 
as all "busy" in the metrics. It can raise false alarms if user configures an 
alert on top of it. We can mention about the behavior in the docs section.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix typo in MetadataVersion.IBP_4_0_IV0 [kafka]

2024-06-03 Thread via GitHub


jolshan commented on PR #16181:
URL: https://github.com/apache/kafka/pull/16181#issuecomment-2146534437

   Cherrypicked to 3.8 as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1625304008


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   @abhijeetk88 
   
   One drawback of the current approach is that the thread-pool will be shown 
as all "busy" in the metrics. Could you confirm whether the threads are "shown 
as" busy when they are in `await`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix typo in MetadataVersion.IBP_4_0_IV0 [kafka]

2024-06-03 Thread via GitHub


jolshan merged PR #16181:
URL: https://github.com/apache/kafka/pull/16181


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15931: Reopen TransactionIndex if channel is closed [kafka]

2024-06-03 Thread via GitHub


kamalcph commented on PR #15241:
URL: https://github.com/apache/kafka/pull/15241#issuecomment-2146529487

   @jeqo 
   
   Any updates on this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16882: Migrate RemoteLogSegmentLifecycleTest to ClusterInstance infra [kafka]

2024-06-03 Thread via GitHub


brandboat commented on code in PR #16180:
URL: https://github.com/apache/kafka/pull/16180#discussion_r1625299519


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java:
##
@@ -197,333 +206,159 @@ public void 
testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot
 for (Map.Entry entry : 
expectedEpochToHighestOffset.entrySet()) {
 Integer epoch = entry.getKey();
 Long expectedOffset = entry.getValue();
-Optional offset = 
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch);
-log.debug("Fetching highest offset for epoch: {} , returned: 
{} , expected: {}", epoch, offset, expectedOffset);
-Assertions.assertEquals(Optional.of(expectedOffset), offset);
+Optional offset = 
metadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+assertEquals(Optional.of(expectedOffset), offset);
 }
 
 // Search for non existing leader epoch
-Optional highestOffsetForEpoch5 = 
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(5);
-Assertions.assertFalse(highestOffsetForEpoch5.isPresent());
-} finally {
-Utils.closeQuietly(remoteLogSegmentLifecycleManager, 
"RemoteLogSegmentLifecycleManager");
+Optional highestOffsetForEpoch5 = 
metadataManager.highestOffsetForEpoch(topicIdPartition, 5);
+assertFalse(highestOffsetForEpoch5.isPresent());
 }
 }
 
-private RemoteLogSegmentMetadata 
createSegmentUpdateWithState(RemoteLogSegmentLifecycleManager 
remoteLogSegmentLifecycleManager,
-  Map segmentLeaderEpochs,
-  long 
startOffset,
-  long 
endOffset,
-  
RemoteLogSegmentState state)
-throws RemoteStorageException {
+private RemoteLogSegmentMetadata 
upsertSegmentState(RemoteLogMetadataManager metadataManager,
+Map 
segmentLeaderEpochs,
+long startOffset,
+long endOffset,
+RemoteLogSegmentState 
state)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
 RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
-RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, BROKER_ID_0,
-   
 time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
-
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
-
-RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
-Optional.empty(),
-state, BROKER_ID_1);
-
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
-
+RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+-1L, brokerId0, time.milliseconds(), segSize, 
segmentLeaderEpochs);
+metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
+
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
+
+RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId,
+time.milliseconds(), Optional.empty(), state, brokerId1);
+
metadataManager.updateRemoteLogSegmentMetadata(segMetadataUpdate).get();
+
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segMetadataUpdate);
 return segmentMetadata.createWithUpdates(segMetadataUpdate);
 }
 
-private static class EpochOffset {
-final int epoch;
-final long offset;
-
-private EpochOffset(int epoch,
-long offset) {
-this.epoch = epoch;
-this.offset = offset;
-}
-
-@Override
-public boolean equals(Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-EpochOffset that = (EpochOffset) o;
-return epoch == that.epoch && offset == that.offset;
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(epoch, offset);
-}
-
-@Override
-

Re: [PR] KAFKA-16882: Migrate RemoteLogSegmentLifecycleTest to ClusterInstance infra [kafka]

2024-06-03 Thread via GitHub


kamalcph commented on code in PR #16180:
URL: https://github.com/apache/kafka/pull/16180#discussion_r1625298782


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java:
##
@@ -197,333 +206,159 @@ public void 
testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot
 for (Map.Entry entry : 
expectedEpochToHighestOffset.entrySet()) {
 Integer epoch = entry.getKey();
 Long expectedOffset = entry.getValue();
-Optional offset = 
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch);
-log.debug("Fetching highest offset for epoch: {} , returned: 
{} , expected: {}", epoch, offset, expectedOffset);
-Assertions.assertEquals(Optional.of(expectedOffset), offset);
+Optional offset = 
metadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+assertEquals(Optional.of(expectedOffset), offset);
 }
 
 // Search for non existing leader epoch
-Optional highestOffsetForEpoch5 = 
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(5);
-Assertions.assertFalse(highestOffsetForEpoch5.isPresent());
-} finally {
-Utils.closeQuietly(remoteLogSegmentLifecycleManager, 
"RemoteLogSegmentLifecycleManager");
+Optional highestOffsetForEpoch5 = 
metadataManager.highestOffsetForEpoch(topicIdPartition, 5);
+assertFalse(highestOffsetForEpoch5.isPresent());
 }
 }
 
-private RemoteLogSegmentMetadata 
createSegmentUpdateWithState(RemoteLogSegmentLifecycleManager 
remoteLogSegmentLifecycleManager,
-  Map segmentLeaderEpochs,
-  long 
startOffset,
-  long 
endOffset,
-  
RemoteLogSegmentState state)
-throws RemoteStorageException {
+private RemoteLogSegmentMetadata 
upsertSegmentState(RemoteLogMetadataManager metadataManager,
+Map 
segmentLeaderEpochs,
+long startOffset,
+long endOffset,
+RemoteLogSegmentState 
state)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
 RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
-RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, BROKER_ID_0,
-   
 time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
-
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
-
-RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
-Optional.empty(),
-state, BROKER_ID_1);
-
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
-
+RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+-1L, brokerId0, time.milliseconds(), segSize, 
segmentLeaderEpochs);
+metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
+
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
+
+RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId,
+time.milliseconds(), Optional.empty(), state, brokerId1);
+
metadataManager.updateRemoteLogSegmentMetadata(segMetadataUpdate).get();
+
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segMetadataUpdate);
 return segmentMetadata.createWithUpdates(segMetadataUpdate);
 }
 
-private static class EpochOffset {
-final int epoch;
-final long offset;
-
-private EpochOffset(int epoch,
-long offset) {
-this.epoch = epoch;
-this.offset = offset;
-}
-
-@Override
-public boolean equals(Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-EpochOffset that = (EpochOffset) o;
-return epoch == that.epoch && offset == that.offset;
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(epoch, offset);
-}
-
-@Override
-

Re: [PR] KAFKA-16715: Create KafkaShareConsumer interfaces [kafka]

2024-06-03 Thread via GitHub


omkreddy merged PR #16134:
URL: https://github.com/apache/kafka/pull/16134


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Log reason for deletion of KRaft snapshot [kafka]

2024-06-03 Thread via GitHub


github-actions[bot] commented on PR #15450:
URL: https://github.com/apache/kafka/pull/15450#issuecomment-2146521360

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1625290704


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {
+logger.debug("Quota exceeded for copying 
log segments, waiting for the quota to be available.");
+// If the thread gets interrupted while 
waiting, the InterruptedException is thrown
+// back to the caller. It's important to 
note that the task being executed is already
+// cancelled before the executing thread 
is interrupted. The caller is responsible
+// for handling the exception gracefully 
by checking if the task is already cancelled.

Review Comment:
   Could you cover shutdown when the quota gets breached as unit test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


showuon commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1625271623


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
 assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that is eligible for copy
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+when(mockLog.lastStableOffset()).thenReturn(250L);
+
+File tempDir = TestUtils.tempDirectory();
+OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+txnFile.createNewFile();
+TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+when(oldSegment.timeIndex()).thenReturn(timeIdx);
+when(oldSegment.offsetIndex()).thenReturn(idx);
+when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+CompletableFuture dummyFuture = new CompletableFuture<>();
+dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.convertToLeader(2);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+try {
+assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));
+fail(EXPECTED_THE_OPERATION_TO_TIME_OUT);
+} catch (AssertionFailedError e) {
+// Fail the test if the operation completed within the timeout
+if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) 
{
+fail(e.getMessage());
+}
+}
+
+// Verify the highest offset in remote storage is updated only once
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 

Re: [PR] KAFKA-10787: Update spotless version and remove support JDK8 [kafka]

2024-06-03 Thread via GitHub


gongxuanzhang commented on PR #16176:
URL: https://github.com/apache/kafka/pull/16176#issuecomment-2146474883

   @chia7712  plz take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15355: Message schema changes [kafka]

2024-06-03 Thread via GitHub


tisonkun commented on code in PR #14290:
URL: https://github.com/apache/kafka/pull/14290#discussion_r1625260173


##
clients/src/main/resources/common/message/BrokerHeartbeatRequest.json:
##
@@ -30,6 +30,8 @@
 { "name": "WantFence", "type": "bool", "versions": "0+",
   "about": "True if the broker wants to be fenced, false otherwise." },
 { "name": "WantShutDown", "type": "bool", "versions": "0+",
-  "about": "True if the broker wants to be shut down, false otherwise." }
+  "about": "True if the broker wants to be shut down, false otherwise." },
+{ "name": "OfflineLogDirs", "type":  "[]uuid", "versions": "1+", 
"taggedVersions": "1+", "tag": "0",

Review Comment:
   Shall this `"tag": "0"` be `"tag": 0`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15305: The background thread should try to process the remaining task until the shutdown timer is expired. [kafka]

2024-06-03 Thread via GitHub


frankvicky commented on code in PR #16156:
URL: https://github.com/apache/kafka/pull/16156#discussion_r1625253815


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -293,12 +293,10 @@ private void closeInternal(final Duration timeout) {
  * Check the unsent queue one last time and poll until all requests are 
sent or the timer runs out.
  */
 private void sendUnsentRequests(final Timer timer) {
-if (networkClientDelegate.unsentRequests().isEmpty())
-return;
-do {
+while (timer.notExpired() && 
networkClientDelegate.hasAnyPendingRequests()) {

Review Comment:
   Hi @lianetm, thank you for pointing out this potential bug, I will fix it. 
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16882: Migrate RemoteLogSegmentLifecycleTest to ClusterInstance infra [kafka]

2024-06-03 Thread via GitHub


brandboat commented on code in PR #16180:
URL: https://github.com/apache/kafka/pull/16180#discussion_r1625252312


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java:
##
@@ -197,333 +206,159 @@ public void 
testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot
 for (Map.Entry entry : 
expectedEpochToHighestOffset.entrySet()) {
 Integer epoch = entry.getKey();
 Long expectedOffset = entry.getValue();
-Optional offset = 
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch);
-log.debug("Fetching highest offset for epoch: {} , returned: 
{} , expected: {}", epoch, offset, expectedOffset);
-Assertions.assertEquals(Optional.of(expectedOffset), offset);
+Optional offset = 
metadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+assertEquals(Optional.of(expectedOffset), offset);
 }
 
 // Search for non existing leader epoch
-Optional highestOffsetForEpoch5 = 
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(5);
-Assertions.assertFalse(highestOffsetForEpoch5.isPresent());
-} finally {
-Utils.closeQuietly(remoteLogSegmentLifecycleManager, 
"RemoteLogSegmentLifecycleManager");
+Optional highestOffsetForEpoch5 = 
metadataManager.highestOffsetForEpoch(topicIdPartition, 5);
+assertFalse(highestOffsetForEpoch5.isPresent());
 }
 }
 
-private RemoteLogSegmentMetadata 
createSegmentUpdateWithState(RemoteLogSegmentLifecycleManager 
remoteLogSegmentLifecycleManager,
-  Map segmentLeaderEpochs,
-  long 
startOffset,
-  long 
endOffset,
-  
RemoteLogSegmentState state)
-throws RemoteStorageException {
+private RemoteLogSegmentMetadata 
upsertSegmentState(RemoteLogMetadataManager metadataManager,
+Map 
segmentLeaderEpochs,
+long startOffset,
+long endOffset,
+RemoteLogSegmentState 
state)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
 RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
-RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, BROKER_ID_0,
-   
 time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
-
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
-
-RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
-Optional.empty(),
-state, BROKER_ID_1);
-
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
-
+RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+-1L, brokerId0, time.milliseconds(), segSize, 
segmentLeaderEpochs);
+metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
+
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
+
+RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId,
+time.milliseconds(), Optional.empty(), state, brokerId1);
+
metadataManager.updateRemoteLogSegmentMetadata(segMetadataUpdate).get();
+
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segMetadataUpdate);
 return segmentMetadata.createWithUpdates(segMetadataUpdate);
 }
 
-private static class EpochOffset {
-final int epoch;
-final long offset;
-
-private EpochOffset(int epoch,
-long offset) {
-this.epoch = epoch;
-this.offset = offset;
-}
-
-@Override
-public boolean equals(Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-EpochOffset that = (EpochOffset) o;
-return epoch == that.epoch && offset == that.offset;
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(epoch, offset);
-}
-
-@Override
-

[jira] [Commented] (KAFKA-16879) SystemTime should use singleton mode

2024-06-03 Thread jiandu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851864#comment-17851864
 ] 

jiandu commented on KAFKA-16879:


thank you

> SystemTime should use singleton mode
> 
>
> Key: KAFKA-16879
> URL: https://issues.apache.org/jira/browse/KAFKA-16879
> Project: Kafka
>  Issue Type: Improvement
>Reporter: jiandu
>Assignee: jiandu
>Priority: Minor
>
> Currently, the {{SystemTime}} class, which provides system time-related 
> functionalities such as getting the current timestamp 、sleep、and await can be 
> instantiated multiple times.
> Howerver,  system time is unique,In an application, the time obtained in 
> different places should be consistent,  But now the time obtained by using 
> the Java System class to interact with the underlying layer is the same。
> So I suggest changing it to a singleton mode, reflect the uniqueness of 
> system time in design
>  
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-06-03 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1625229200


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9863,8 +9863,8 @@ public void 
testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw
 
 // Newly joining member 3 bumps the group epoch. A new target 
assignment is computed.
 CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 1),
-CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, 
memberId1, assignor.targetPartitions(memberId1)),
 CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, 
memberId3, assignor.targetPartitions(memberId3)),
+CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, 
memberId1, assignor.targetPartitions(memberId1)),

Review Comment:
   Oh yes it's exactly that! Thank you so much!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16879: SystemTime should use singleton mode [kafka]

2024-06-03 Thread via GitHub


gharris1727 commented on code in PR #16179:
URL: https://github.com/apache/kafka/pull/16179#discussion_r1625228386


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -30,7 +30,7 @@
  */
 public interface Time {
 
-Time SYSTEM = new SystemTime();
+Time SYSTEM = SystemTime.getInstance();

Review Comment:
   I would be inclined to keep the existing `Time.SYSTEM` and avoid the 
deprecation. SystemTime can then be package-local entirely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16879) SystemTime should use singleton mode

2024-06-03 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851862#comment-17851862
 ] 

黃竣陽 edited comment on KAFKA-16879 at 6/4/24 1:37 AM:
-

Pardon me, I miss that you want to handle this issue by yourself, I will close 
my PR. I'm so sorry


was (Author: JIRAUSER305187):
sorry, I miss that you want to handle this issue by yourself, I will close my 
PR. I'm so sorry

> SystemTime should use singleton mode
> 
>
> Key: KAFKA-16879
> URL: https://issues.apache.org/jira/browse/KAFKA-16879
> Project: Kafka
>  Issue Type: Improvement
>Reporter: jiandu
>Assignee: jiandu
>Priority: Minor
>
> Currently, the {{SystemTime}} class, which provides system time-related 
> functionalities such as getting the current timestamp 、sleep、and await can be 
> instantiated multiple times.
> Howerver,  system time is unique,In an application, the time obtained in 
> different places should be consistent,  But now the time obtained by using 
> the Java System class to interact with the underlying layer is the same。
> So I suggest changing it to a singleton mode, reflect the uniqueness of 
> system time in design
>  
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10787: Update spotless version and remove support JDK8 [kafka]

2024-06-03 Thread via GitHub


gongxuanzhang commented on code in PR #16176:
URL: https://github.com/apache/kafka/pull/16176#discussion_r1625220935


##
build.gradle:
##
@@ -799,7 +800,7 @@ subprojects {
 skipConfigurations = [ "zinc" ]
   }
 
-  if (project.name in spotlessApplyModules) {
+  if (JavaVersion.current().isJava11Compatible() && (project.name in 
spotlessApplyModules)) {
 apply plugin: 'com.diffplug.spotless'

Review Comment:
   Yes, good idea. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Apply spotless to transaction-coordinator and server-common [kafka]

2024-06-03 Thread via GitHub


gongxuanzhang commented on PR #16172:
URL: https://github.com/apache/kafka/pull/16172#issuecomment-2146397207

   @chia7712  complete


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16879) SystemTime should use singleton mode

2024-06-03 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851862#comment-17851862
 ] 

黃竣陽 commented on KAFKA-16879:
-

sorry, I miss that you want to handle this issue by yourself, I will close my 
PR. I'm so sorry

> SystemTime should use singleton mode
> 
>
> Key: KAFKA-16879
> URL: https://issues.apache.org/jira/browse/KAFKA-16879
> Project: Kafka
>  Issue Type: Improvement
>Reporter: jiandu
>Assignee: jiandu
>Priority: Minor
>
> Currently, the {{SystemTime}} class, which provides system time-related 
> functionalities such as getting the current timestamp 、sleep、and await can be 
> instantiated multiple times.
> Howerver,  system time is unique,In an application, the time obtained in 
> different places should be consistent,  But now the time obtained by using 
> the Java System class to interact with the underlying layer is the same。
> So I suggest changing it to a singleton mode, reflect the uniqueness of 
> system time in design
>  
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16879: SystemTime should use singleton mode [kafka]

2024-06-03 Thread via GitHub


m1a2st closed pull request #16179: KAFKA-16879: SystemTime should use singleton 
mode
URL: https://github.com/apache/kafka/pull/16179


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16879: SystemTime should use singleton mode [kafka]

2024-06-03 Thread via GitHub


m1a2st commented on code in PR #16179:
URL: https://github.com/apache/kafka/pull/16179#discussion_r1625217653


##
clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java:
##
@@ -26,6 +26,18 @@
  */
 public class SystemTime implements Time {
 
+/**
+ * use the static inner class to implement the singleton pattern
+ * to avoid the overhead of synchronization

Review Comment:
   True, `SystemTime` is very cheap, but I think that don't use double-checked 
locking to implement the singleton pattern, maybe I should remove this comment 
is more better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16879: SystemTime should use singleton mode [kafka]

2024-06-03 Thread via GitHub


m1a2st commented on code in PR #16179:
URL: https://github.com/apache/kafka/pull/16179#discussion_r1625215483


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -30,7 +30,7 @@
  */
 public interface Time {
 
-Time SYSTEM = new SystemTime();
+Time SYSTEM = SystemTime.getInstance();

Review Comment:
   You are right, I should deprecate `Time SYSTEM` this property, and use 
`SystemTime.getInstance()` to instead it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]

2024-06-03 Thread via GitHub


junrao commented on code in PR #15993:
URL: https://github.com/apache/kafka/pull/15993#discussion_r1625209745


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -305,22 +341,23 @@ public Map.Entry endOffsetFor(int 
requestedEpoch, long logEndOffs
 
 /**
  * Removes all epoch entries from the store with start offsets greater 
than or equal to the passed offset.
+ * 
+ * Checkpoint-flushing is done asynchronously.
  */
 public void truncateFromEnd(long endOffset) {
 lock.writeLock().lock();
 try {
-Optional epochEntry = latestEntry();
-if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
-List removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
-
-// We intentionally don't force flushing change to the device 
here because:
+List removedEntries = truncateFromEnd(epochs, 
endOffset);
+if (!removedEntries.isEmpty()) {
+// We flush the change to the device in the background because:
 // - To avoid fsync latency
 //   * fsync latency could be huge on a disk glitch, which is 
not rare in spinning drives
 //   * This method is called by ReplicaFetcher threads, which 
could block replica fetching
 // then causing ISR shrink or high produce response time 
degradation in remote scope on high fsync latency.
-// - Even when stale epochs remained in LeaderEpoch file due 
to the unclean shutdown, it will be handled by
-//   another truncateFromEnd call on log loading procedure so 
it won't be a problem
-writeToFile(false);
+// - We still flush the change in #assign synchronously, 
meaning that it's guaranteed that the checkpoint file always has no missing 
entries.
+//   * Even when stale epochs are restored from the checkpoint 
file after the unclean shutdown, it will be handled by
+// another truncateFromEnd call on log loading procedure, 
so it won't be a problem
+scheduler.scheduleOnce("leader-epoch-cache-flush-" + 
topicPartition, this::writeToFileForTruncation);

Review Comment:
   Oh, I didn't see the partition after the prefix. So, this is fine then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16879: SystemTime should use singleton mode [kafka]

2024-06-03 Thread via GitHub


gharris1727 commented on code in PR #16179:
URL: https://github.com/apache/kafka/pull/16179#discussion_r1625208358


##
clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java:
##
@@ -26,6 +26,18 @@
  */
 public class SystemTime implements Time {
 
+/**
+ * use the static inner class to implement the singleton pattern
+ * to avoid the overhead of synchronization

Review Comment:
   Can you explain this comment? I don't know what it's referring to, as none 
of this code is synchronized.
   
   The static inner class seems to be a lazy initialization trick, which is 
ineffective here (due to Time.SYSTEM) and unneeded (since the SystemTime 
constructor is very cheap).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16879) SystemTime should use singleton mode

2024-06-03 Thread jiandu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851860#comment-17851860
 ] 

jiandu edited comment on KAFKA-16879 at 6/4/24 1:07 AM:


[~m1a2st]

Sorry, you should choose to handle issues that have no assigned handlers or 
have not been handled for a long time. I need to handle this issue myself.


was (Author: JIRAUSER305334):
Sorry, you should choose to handle issues that have no assigned handlers or 
have not been handled for a long time. I need to handle this issue myself.

> SystemTime should use singleton mode
> 
>
> Key: KAFKA-16879
> URL: https://issues.apache.org/jira/browse/KAFKA-16879
> Project: Kafka
>  Issue Type: Improvement
>Reporter: jiandu
>Assignee: jiandu
>Priority: Minor
>
> Currently, the {{SystemTime}} class, which provides system time-related 
> functionalities such as getting the current timestamp 、sleep、and await can be 
> instantiated multiple times.
> Howerver,  system time is unique,In an application, the time obtained in 
> different places should be consistent,  But now the time obtained by using 
> the Java System class to interact with the underlying layer is the same。
> So I suggest changing it to a singleton mode, reflect the uniqueness of 
> system time in design
>  
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16879) SystemTime should use singleton mode

2024-06-03 Thread jiandu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851860#comment-17851860
 ] 

jiandu commented on KAFKA-16879:


Sorry, you should choose to handle issues that have no assigned handlers or 
have not been handled for a long time. I need to handle this issue myself.

> SystemTime should use singleton mode
> 
>
> Key: KAFKA-16879
> URL: https://issues.apache.org/jira/browse/KAFKA-16879
> Project: Kafka
>  Issue Type: Improvement
>Reporter: jiandu
>Assignee: jiandu
>Priority: Minor
>
> Currently, the {{SystemTime}} class, which provides system time-related 
> functionalities such as getting the current timestamp 、sleep、and await can be 
> instantiated multiple times.
> Howerver,  system time is unique,In an application, the time obtained in 
> different places should be consistent,  But now the time obtained by using 
> the Java System class to interact with the underlying layer is the same。
> So I suggest changing it to a singleton mode, reflect the uniqueness of 
> system time in design
>  
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16879: SystemTime should use singleton mode [kafka]

2024-06-03 Thread via GitHub


gharris1727 commented on code in PR #16179:
URL: https://github.com/apache/kafka/pull/16179#discussion_r1625205338


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -30,7 +30,7 @@
  */
 public interface Time {
 
-Time SYSTEM = new SystemTime();
+Time SYSTEM = SystemTime.getInstance();

Review Comment:
   Hey, is there a reason this isn't the canonical singleton instance? Why 
would we want two different ways to get the singleton `Time.SYSTEM` and 
`SystemTime.getInstance()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]

2024-06-03 Thread via GitHub


ocadaruma commented on code in PR #15993:
URL: https://github.com/apache/kafka/pull/15993#discussion_r1625200232


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -305,22 +341,23 @@ public Map.Entry endOffsetFor(int 
requestedEpoch, long logEndOffs
 
 /**
  * Removes all epoch entries from the store with start offsets greater 
than or equal to the passed offset.
+ * 
+ * Checkpoint-flushing is done asynchronously.
  */
 public void truncateFromEnd(long endOffset) {
 lock.writeLock().lock();
 try {
-Optional epochEntry = latestEntry();
-if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
-List removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
-
-// We intentionally don't force flushing change to the device 
here because:
+List removedEntries = truncateFromEnd(epochs, 
endOffset);
+if (!removedEntries.isEmpty()) {
+// We flush the change to the device in the background because:
 // - To avoid fsync latency
 //   * fsync latency could be huge on a disk glitch, which is 
not rare in spinning drives
 //   * This method is called by ReplicaFetcher threads, which 
could block replica fetching
 // then causing ISR shrink or high produce response time 
degradation in remote scope on high fsync latency.
-// - Even when stale epochs remained in LeaderEpoch file due 
to the unclean shutdown, it will be handled by
-//   another truncateFromEnd call on log loading procedure so 
it won't be a problem
-writeToFile(false);
+// - We still flush the change in #assign synchronously, 
meaning that it's guaranteed that the checkpoint file always has no missing 
entries.
+//   * Even when stale epochs are restored from the checkpoint 
file after the unclean shutdown, it will be handled by
+// another truncateFromEnd call on log loading procedure, 
so it won't be a problem
+scheduler.scheduleOnce("leader-epoch-cache-flush-" + 
topicPartition, this::writeToFileForTruncation);

Review Comment:
   Not strictly necessary. I just followed some of existing scheduled tasks's 
convention (e.g. 
https://github.com/apache/kafka/blob/c24f94936d4370b2a3f3bfad42c56198208079b4/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L571)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]

2024-06-03 Thread via GitHub


ocadaruma commented on code in PR #15993:
URL: https://github.com/apache/kafka/pull/15993#discussion_r1625200232


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -305,22 +341,23 @@ public Map.Entry endOffsetFor(int 
requestedEpoch, long logEndOffs
 
 /**
  * Removes all epoch entries from the store with start offsets greater 
than or equal to the passed offset.
+ * 
+ * Checkpoint-flushing is done asynchronously.
  */
 public void truncateFromEnd(long endOffset) {
 lock.writeLock().lock();
 try {
-Optional epochEntry = latestEntry();
-if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
-List removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
-
-// We intentionally don't force flushing change to the device 
here because:
+List removedEntries = truncateFromEnd(epochs, 
endOffset);
+if (!removedEntries.isEmpty()) {
+// We flush the change to the device in the background because:
 // - To avoid fsync latency
 //   * fsync latency could be huge on a disk glitch, which is 
not rare in spinning drives
 //   * This method is called by ReplicaFetcher threads, which 
could block replica fetching
 // then causing ISR shrink or high produce response time 
degradation in remote scope on high fsync latency.
-// - Even when stale epochs remained in LeaderEpoch file due 
to the unclean shutdown, it will be handled by
-//   another truncateFromEnd call on log loading procedure so 
it won't be a problem
-writeToFile(false);
+// - We still flush the change in #assign synchronously, 
meaning that it's guaranteed that the checkpoint file always has no missing 
entries.
+//   * Even when stale epochs are restored from the checkpoint 
file after the unclean shutdown, it will be handled by
+// another truncateFromEnd call on log loading procedure, 
so it won't be a problem
+scheduler.scheduleOnce("leader-epoch-cache-flush-" + 
topicPartition, this::writeToFileForTruncation);

Review Comment:
   Not strictly necessary but I just followed some of existing scheduled 
tasks's convention (e.g. 
https://github.com/apache/kafka/blob/c24f94936d4370b2a3f3bfad42c56198208079b4/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L571)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]

2024-06-03 Thread via GitHub


ocadaruma commented on code in PR #15993:
URL: https://github.com/apache/kafka/pull/15993#discussion_r1625196957


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -608,25 +613,23 @@ public boolean isCancelled() {
 }
 
 /**
- * Returns the leader epoch checkpoint by truncating with the given 
start[exclusive] and end[inclusive] offset
+ * Returns the leader epoch entries within the range of the given 
start[exclusive] and end[inclusive] offset.
+ * 
+ * Visible for testing.
  *
  * @param log The actual log from where to take the leader-epoch 
checkpoint
- * @param startOffset The start offset of the checkpoint file (exclusive 
in the truncation).
+ * @param startOffset The start offset of the epoch entries (exclusive).

Review Comment:
   My bad, that's right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16047: Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers [kafka]

2024-06-03 Thread via GitHub


edoardocomar commented on PR #16151:
URL: https://github.com/apache/kafka/pull/16151#issuecomment-2146353225

   > If someone wants to raise the timeout for this one operation, I don't 
think that we should require them to increase the client-global 
request.timeout.ms.
   I agree to that. Hopefully the commit addresses your concerns.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Apply spotless to transaction-coordinator and server-common [kafka]

2024-06-03 Thread via GitHub


chia7712 commented on PR #16172:
URL: https://github.com/apache/kafka/pull/16172#issuecomment-2146344557

   @gongxuanzhang please fix the conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]

2024-06-03 Thread via GitHub


junrao commented on code in PR #15993:
URL: https://github.com/apache/kafka/pull/15993#discussion_r1625162518


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -42,10 +45,15 @@
  * 
  * Leader Epoch = epoch assigned to each leader by the controller.
  * Offset = offset of the first message in each epoch.
+ * 
+ * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flush the 
epoch-entry changes to checkpoint asynchronously.

Review Comment:
   Perhaps name truncateFromStart and truncateFromEnd to sth like 
truncateFromStartAsyncFlush and truncateFromEndAsyncFlush to make it clear?



##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -386,11 +442,39 @@ public OptionalInt epochForOffset(long offset) {
 }
 }
 
-public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint 
leaderEpochCheckpoint) {
+/**
+ * Returns a new LeaderEpochFileCache which contains same
+ * epoch entries with replacing backing checkpoint file.
+ * @param leaderEpochCheckpoint the new checkpoint file
+ * @return a new LeaderEpochFileCache instance
+ */
+public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpointFile 
leaderEpochCheckpoint) {
+lock.readLock().lock();
+try {
+return new LeaderEpochFileCache(epochEntries(),
+topicPartition,
+leaderEpochCheckpoint,
+scheduler);
+} finally {
+lock.readLock().unlock();
+}
+}
+
+/**
+ * Returns the leader epoch entries within the range of the given 
start[exclusive] and end[inclusive] offset
+ * @param startOffset The start offset of the epoch entries (exclusive).

Review Comment:
   From the caller's perspective, start offset is inclusive and end offset in 
exclusive.



##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -305,22 +341,23 @@ public Map.Entry endOffsetFor(int 
requestedEpoch, long logEndOffs
 
 /**
  * Removes all epoch entries from the store with start offsets greater 
than or equal to the passed offset.
+ * 
+ * Checkpoint-flushing is done asynchronously.
  */
 public void truncateFromEnd(long endOffset) {
 lock.writeLock().lock();
 try {
-Optional epochEntry = latestEntry();
-if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
-List removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
-
-// We intentionally don't force flushing change to the device 
here because:
+List removedEntries = truncateFromEnd(epochs, 
endOffset);
+if (!removedEntries.isEmpty()) {
+// We flush the change to the device in the background because:
 // - To avoid fsync latency
 //   * fsync latency could be huge on a disk glitch, which is 
not rare in spinning drives
 //   * This method is called by ReplicaFetcher threads, which 
could block replica fetching
 // then causing ISR shrink or high produce response time 
degradation in remote scope on high fsync latency.
-// - Even when stale epochs remained in LeaderEpoch file due 
to the unclean shutdown, it will be handled by
-//   another truncateFromEnd call on log loading procedure so 
it won't be a problem
-writeToFile(false);
+// - We still flush the change in #assign synchronously, 
meaning that it's guaranteed that the checkpoint file always has no missing 
entries.
+//   * Even when stale epochs are restored from the checkpoint 
file after the unclean shutdown, it will be handled by
+// another truncateFromEnd call on log loading procedure, 
so it won't be a problem
+scheduler.scheduleOnce("leader-epoch-cache-flush-" + 
topicPartition, this::writeToFileForTruncation);

Review Comment:
   Hmm, why do we need to add a trailing - in "leader-epoch-cache-flush-"?



##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -128,15 +162,17 @@ private void maybeTruncateNonMonotonicEntries(EpochEntry 
newEntry) {
 }
 }
 
-private List removeFromEnd(Predicate predicate) {
+private static List removeFromEnd(

Review Comment:
   Could we fold this method into the caller since it only has 1 line? Ditto 
for `removeFromStart`.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -608,25 +613,23 @@ public boolean isCancelled() {
 }
 
 /**
- * Returns the leader epoch checkpoint by truncating with the given 
start[exclusive] 

[PR] KAFKA-16740: Adding skeleton code for Share Fetch and Acknowledge RPC [kafka]

2024-06-03 Thread via GitHub


apoorvmittal10 opened a new pull request, #16184:
URL: https://github.com/apache/kafka/pull/16184

   The PR adds skeleton code for Share Fetch and Acknowledge RPCs. The changes 
include:
   1. Defining RPCs in KafkaApis.scala
   2. Added new SharePartitionManager class which handles the RPCs handling
   3. Added SharePartition class which manages in-memory record states and for 
fetched data.
   
    Note: The new classes are in `core` module itself as has dependency on 
core classes like `ReplicaManager` etc. hence cannot move the classes in 
`server` module.
   
   Subsequent PRs will follow implementation and tests cases for the 
functionality.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2146294482

   @lianetm @cadonna—this is ready for another review. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16192: Introduce usage of flexible records to coordinators [kafka]

2024-06-03 Thread via GitHub


jolshan opened a new pull request, #16183:
URL: https://github.com/apache/kafka/pull/16183

   This change includes adding transaction.version (part of KIP-1022)
   
   New transaction version 1 is introduced to support writing flexible fields 
in transaction state log messages.
   
   Transaction version 2 is created in anticipation for further KIP-890 changes.
   
   Neither are made production ready. Tests for the new transaction version and 
new MV are created. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-03 Thread via GitHub


chia7712 commented on PR #16169:
URL: https://github.com/apache/kafka/pull/16169#issuecomment-2146219949

   > Did we mean to totally remove the null check here?
   
   Pardon me, the topic is related to "update" of cipherInformation, and this 
PR has fixed that.
   
   If you prefer to either keep checks or have more checks, I can file a PR to 
add consistent/explicit checks to both testing and production code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16858: Throw DataException from validateValue on array and map schemas without inner schemas [kafka]

2024-06-03 Thread via GitHub


gharris1727 commented on PR #16161:
URL: https://github.com/apache/kafka/pull/16161#issuecomment-2146189325

   Hey @C0urante @yashmayya Could either of you PTAL? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625068181


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -270,30 +326,27 @@ void testPollResultTimer() {
 void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
+
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
+
 consumerNetworkThread.runOnce();
 // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
 }
 
 @Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+void testEnsureEventsAreCompleted() {

Review Comment:
   Not introduced by this PR but since we're improving here, I find that this 
test does not bring any value, because it's testing something that it's not the 
responsibility of the `ConsumerNetworkThread`, so we end up testing something 
we're mocking ourselves with the `doAnswer`. What do you think?
   
   I had suggested to remove it completely. See my original comment about it on 
this other [PR](https://github.com/apache/kafka/pull/15640/files#r1608897877). 
If you agree I would say we remove it instead of keep having to maintain it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException

2024-06-03 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-16105.
-
Resolution: Fixed

> Reassignment of tiered topics is failing due to RemoteStorageException
> --
>
> Key: KAFKA-16105
> URL: https://issues.apache.org/jira/browse/KAFKA-16105
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Reporter: Anatolii Popov
>Priority: Critical
>  Labels: tiered-storage
> Fix For: 3.8.0, 3.7.1, 3.9.0
>
>
> When partition reassignment is happening for a tiered topic in most of the 
> cases it's stuck with RemoteStorageException's on follower nodes saying that 
> it can not construct remote log auxilary state:
>  
> {code:java}
> [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, 
> fetcherId=2] Error building remote log auxiliary state for test-24 
> (kafka.server.ReplicaFetcherThread)
>                                          
> org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't 
> build the state from remote store for partition: test-24, currentLeaderEpoch: 
> 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the 
> previous remote log segment metadata was not found
>                                                  at 
> kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259)
>                                                  at 
> kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106)
>                                                  at 
> kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413)
>                                                  at 
> scala.Option.foreach(Option.scala:437)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331)
>                                                  at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321)
>                                                  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
>                                                  at 
> scala.Option.foreach(Option.scala:437)
>                                                  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
>                                                  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
>                                                  at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>                                                  at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)
>  {code}
>  
> Scenario:
> A cluster of 3 nodes with a single topic with 30 partitions. All partitions 
> have tiered segments.
> Adding 3 more nodes to the cluster and making a reassignment to move all the 
> data to new nodes.
> Behavior:
> For most of the partitions reassignment is happening smoothly.
> For some of the partitions when a new node starts to get assignments it reads 
> __remote_log_metadata topic and tries to initialize the metadata cache on 
> records with COPY_SEGMENT_STARTED. If it's reading such a message for the 
> partition before the partition was assigned to this specific node it ignores 
> the message, so skips the cache initialization and marks the partition as 
> assigned. So reassignment is stuck since 

Re: [PR] KAFKA-16105: Reset read offsets when seeking to beginning in TBRLMM [kafka]

2024-06-03 Thread via GitHub


gharris1727 commented on PR #15165:
URL: https://github.com/apache/kafka/pull/15165#issuecomment-2146174509

   Thank you @AnatolyPopov for the PR, and thanks @showuon and @kamalcph for 
your reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625068181


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -270,30 +326,27 @@ void testPollResultTimer() {
 void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
+
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
+
 consumerNetworkThread.runOnce();
 // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
 }
 
 @Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+void testEnsureEventsAreCompleted() {

Review Comment:
   Not introduced by this PR but since we're improving here, I find that this 
test does not bring any value, because it's testing something that it's not the 
responsibility of the `ConsumerNetworkThread`, so we end up testing something 
we're mocking ourselves with the `doAnswer`. What do you think?
   
   I had suggested to remove it completely. See my original comment about it on 
this other [PR](https://github.com/apache/kafka/pull/15640/files#r1608897877).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16530: Fix high-watermark calculation to not assume the leader is in the voter set [kafka]

2024-06-03 Thread via GitHub


jsancio commented on code in PR #16079:
URL: https://github.com/apache/kafka/pull/16079#discussion_r1625072194


##
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##
@@ -272,20 +280,77 @@ public void testUpdateHighWatermarkQuorumSizeThree() {
 assertEquals(Optional.empty(), state.highWatermark());
 assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L)));
 assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-assertFalse(state.updateLocalState(new LogOffsetMetadata(20L)));
+assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), 
voterSet));
 assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
 assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(20L)));
 assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
 assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(20L)));
 assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
 }
 
+@Test
+public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() {
+int node1 = 1;
+int node2 = 2;
+Set originalVoterSet = mkSet(localId, node1, node2);
+LeaderState state = newLeaderState(originalVoterSet, 10L);
+assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
originalVoterSet));
+assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(15L)));
+assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(10L)));
+assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
+
+// removing node1 should not decrement HW to 10L
+Set voterSetWithoutNode1 = mkSet(localId, node2);
+assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), 
voterSetWithoutNode1));
+assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
+
+// HW cannot change until after node2 catches up to last HW
+assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(14L)));
+assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
+assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L)));
+assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
+
+// HW should update to 16L
+assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(16L)));
+assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
+}
+
+@Test
+public void testUpdateHighWatermarkQuorumRemovingLeaderFromVoterStates() {
+int node1 = 1;
+int node2 = 2;
+Set originalVoterSet = mkSet(localId, node1, node2);
+LeaderState state = newLeaderState(originalVoterSet, 10L);
+assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
originalVoterSet));
+assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(15L)));
+assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(10L)));
+assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
+
+// removing leader should not decrement HW to 10L
+Set voterSetWithoutLeader = mkSet(node1, node2);
+assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), 
voterSetWithoutLeader));
+assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
+
+// HW cannot change until node2 catches up to last HW

Review Comment:
   Before checking this, let's increase node1's LEO to 16 and show that it 
doesn't increase the HWM.



##
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##
@@ -272,20 +280,77 @@ public void testUpdateHighWatermarkQuorumSizeThree() {
 assertEquals(Optional.empty(), state.highWatermark());
 assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L)));
 assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-assertFalse(state.updateLocalState(new LogOffsetMetadata(20L)));
+assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), 
voterSet));
 assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
 assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(20L)));
 assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
 assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(20L)));
 assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
 }
 
+@Test
+public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() {

Review Comment:
   Are you planning to add a test for adding a follower to the voter set?



-- 
This is 

Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]

2024-06-03 Thread via GitHub


cmccabe commented on code in PR #16058:
URL: https://github.com/apache/kafka/pull/16058#discussion_r1625074887


##
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##
@@ -306,6 +309,113 @@ class RaftManagerTest {
 
   }
 
+  class ReconfigurationTestContext(

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2146155510

   Hey @brenden20, very nice improvement! Left a few comments. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625068181


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -270,30 +326,27 @@ void testPollResultTimer() {
 void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
+
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
+
 consumerNetworkThread.runOnce();
 // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
 }
 
 @Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+void testEnsureEventsAreCompleted() {

Review Comment:
   Not introduced by this PR but since we're improving here, I find that this 
test does not bring any value, because it's testing something that it's not the 
responsibility of the `ConsumerNetworkThread`, so in the end we end up testing 
something we're mocking ourselves with the `doAnswer`. What do you think?
   
   I had suggested to remove it completely. See my original comment about it on 
this other [PR](https://github.com/apache/kafka/pull/15640/files#r1608897877).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException

2024-06-03 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16105:

Fix Version/s: 3.8.0
   3.7.1
   3.9.0

> Reassignment of tiered topics is failing due to RemoteStorageException
> --
>
> Key: KAFKA-16105
> URL: https://issues.apache.org/jira/browse/KAFKA-16105
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Reporter: Anatolii Popov
>Priority: Critical
>  Labels: tiered-storage
> Fix For: 3.8.0, 3.7.1, 3.9.0
>
>
> When partition reassignment is happening for a tiered topic in most of the 
> cases it's stuck with RemoteStorageException's on follower nodes saying that 
> it can not construct remote log auxilary state:
>  
> {code:java}
> [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, 
> fetcherId=2] Error building remote log auxiliary state for test-24 
> (kafka.server.ReplicaFetcherThread)
>                                          
> org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't 
> build the state from remote store for partition: test-24, currentLeaderEpoch: 
> 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the 
> previous remote log segment metadata was not found
>                                                  at 
> kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259)
>                                                  at 
> kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106)
>                                                  at 
> kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413)
>                                                  at 
> scala.Option.foreach(Option.scala:437)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331)
>                                                  at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321)
>                                                  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
>                                                  at 
> scala.Option.foreach(Option.scala:437)
>                                                  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
>                                                  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
>                                                  at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>                                                  at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)
>  {code}
>  
> Scenario:
> A cluster of 3 nodes with a single topic with 30 partitions. All partitions 
> have tiered segments.
> Adding 3 more nodes to the cluster and making a reassignment to move all the 
> data to new nodes.
> Behavior:
> For most of the partitions reassignment is happening smoothly.
> For some of the partitions when a new node starts to get assignments it reads 
> __remote_log_metadata topic and tries to initialize the metadata cache on 
> records with COPY_SEGMENT_STARTED. If it's reading such a message for the 
> partition before the partition was assigned to this specific node it ignores 
> the message, so skips the cache initialization and marks the partition as 
> 

Re: [PR] KAFKA-16525; Dynamic KRaft network manager and channel [kafka]

2024-06-03 Thread via GitHub


cmccabe merged PR #15986:
URL: https://github.com/apache/kafka/pull/15986


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16362) Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide

2024-06-03 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-16362.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide
> 
>
> Key: KAFKA-16362
> URL: https://issues.apache.org/jira/browse/KAFKA-16362
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Greg Harris
>Assignee: Ramin Gharib
>Priority: Trivial
>  Labels: newbie++
> Fix For: 3.8.0
>
>
> The implementation of KStreamKStreamJoin has several places that the compiler 
> emits warnings for, that are later suppressed or ignored:
>  * LeftOrRightValue.make returns a raw LeftOrRightValue without generic 
> arguments, because the generic type arguments depend on the boolean input.
>  * Calling LeftOrRightValue includes an unchecked cast before inserting the 
> record into the outerJoinStore
>  * emitNonJoinedOuterRecords swaps the left and right values, and performs an 
> unchecked cast
> These seem to be closely related to the isLeftSide variable, which makes the 
> class behave differently whether it is present on the left or right side of a 
> join.
> We should figure out if these warnings can be eliminated by a refactor, 
> perhaps into KStreamKstreamJoin.Left and KStreamKStreamJoin.Right, or with 
> some generic arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625060556


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -270,30 +326,27 @@ void testPollResultTimer() {
 void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
+
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
+
 consumerNetworkThread.runOnce();
 // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
 }
 
 @Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+void testEnsureEventsAreCompleted() {
+Cluster cluster = mock(Cluster.class);
+when(metadata.fetch()).thenReturn(cluster);
 
-@Test
-void testEnsureMetadataUpdateOnPoll() {
-MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
-client.prepareMetadataUpdate(metadataResponse);
-metadata.requestUpdate(false);
-consumerNetworkThread.runOnce();
-verify(metadata, 
times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), 
anyLong());
-}
+List list = new ArrayList<>();
+list.add(new Node(0, "host", 0));
+when(cluster.nodes()).thenReturn(list);

Review Comment:
   what about simplifying to a single 
`when(cluster.nodes()).thenReturn(Collections.singletonList(new Node(0, "host", 
0)));` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625059147


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -289,10 +289,11 @@ private void closeInternal(final Duration timeout) {
 }
 }
 
+// Add test to see if poll() is run once with timer of 0

Review Comment:
   is this something you intend to do on this PR or separately? Either way, I 
would suggest we remove the comment and address it in this PR or have a jira to 
follow-up separately if needed please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-06-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16792:
--
Description: 
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
 - testCurrentLag
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions
 - testPollReturnsRecords
 - testResetToCommittedOffset
 - testResetUsingAutoResetPolicy

 

  was:
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
 - testFetchStableOffsetThrowInPoll
 - testCurrentLag
 - testListOffsetShouldUpdateSubscriptions
 - testResetToCommittedOffset
 - testResetUsingAutoResetPolicy
 - testPollReturnsRecords

 


> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testCurrentLag
>  - testFetchStableOffsetThrowInPoll
>  - testListOffsetShouldUpdateSubscriptions
>  - testPollReturnsRecords
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625054713


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -329,6 +382,8 @@ void testEnsureEventsAreCompleted() {
 
 @Test
 void testCleanupInvokesReaper() {
+LinkedList queue = new 
LinkedList<>();
+when(networkClientDelegate.unsentRequests()).thenReturn(queue);

Review Comment:
   totally needed, but heads-up, there is another 
[PR](https://github.com/apache/kafka/pull/16156) in-flight where we're trying 
to change the check behind this, so we'll need to update this expectation here 
if that goes in first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-06-03 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850215#comment-17850215
 ] 

Kirk True edited comment on KAFKA-16792 at 6/3/24 9:09 PM:
---

The following don't work for reasons other than the timeout:
 - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} 
RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}}
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions: this is expecting {{poll()}} to 
call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead 
sends {{FETCH_OFFSETS}}
 - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs


was (Author: kirktrue):
These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)

The following still don't work:
 - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} 
RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}}
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions: this is expecting {{poll()}} to 
call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead 
sends {{FETCH_OFFSETS}}
 - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs
 - testResetToCommittedOffset

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  - testPollReturnsRecords
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]

2024-06-03 Thread via GitHub


jsancio commented on code in PR #16058:
URL: https://github.com/apache/kafka/pull/16058#discussion_r1625049639


##
clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java:
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Locale;
+import java.util.Objects;
+
+/**
+ * An endpoint for a raft quorum voter.
+ */
+@InterfaceStability.Stable
+public class RaftVoterEndpoint {
+private final String name;
+private final String host;
+private final int port;
+private final String securityProtocol;

Review Comment:
   > Is the expectation that the security protocol is found by looking up the 
endpoint name in `listener.security.protocol.map`? What happens if it's not 
there? Do we fail or fall back to `PLAINTEXT`?
   
   It will use whatever is returned by 
`KafkaConfig.effectiveListenerSecurityProtocolMap`. Yes, it does look like that 
code currently returns PLAINTEXT if there is not security mapping for the 
controller listener.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-06-03 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850215#comment-17850215
 ] 

Kirk True edited comment on KAFKA-16792 at 6/3/24 9:03 PM:
---

These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)

The following still don't work:
 - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} 
RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}}
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions: this is expecting {{poll()}} to 
call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead 
sends {{FETCH_OFFSETS}}
 - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs
 - testResetToCommittedOffset


was (Author: kirktrue):
These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)

The following still don't work:
 - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} 
RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}}
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions
 - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs
 - testResetToCommittedOffset

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  - testPollReturnsRecords
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625048146


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+void testRequestManagersArePolledOnce() {
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
 @Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+public void testApplicationEvent() {

Review Comment:
   Not introduced by this PR, but reviewing this one I noticed that we have 
lots of similar tests for checking that runOnce processes the events (ex. 
testResetPositionsEventIsProcessed, testSyncCommitEvent, 
testAsyncCommitEvent...), all doing the same (add event, run once, check is 
processed). Could we parametrize this and have a single test maybe? (same shape 
as this one but receiving the event as param). Should allow us to remove lots 
that, in the end this component does not care about the specific events, it's 
just responsible for processing whatever event is added. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]

2024-06-03 Thread via GitHub


jsancio commented on code in PR #16058:
URL: https://github.com/apache/kafka/pull/16058#discussion_r1625041690


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -318,4 +329,28 @@ class KafkaRaftManager[T](
   override def voterNode(id: Int, listener: String): Option[Node] = {
 client.voterNode(id, listener).toScala
   }
+
+  def verifyCommonFields(
+providedClusterId: String,
+  ): Unit = {
+if (!providedClusterId.equals(clusterId)) {
+  throw new InconsistentClusterIdException("Provided cluster ID " + 
providedClusterId +

Review Comment:
   > Hmm, your URL doesn't work for me.
   
   This is the URL/code I was trying to send: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ControllerApis.scala#L147-L151



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-06-03 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1625023687


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -527,35 +531,44 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
   + "tasks for source topics vs 
changelog topics.");
 }
 
-final Set logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
-final Set allTopicPartitions = new 
HashSet<>();
+final Set topicsRequiringRackInfo = new 
HashSet<>();
+final AtomicBoolean rackInformationFetched = new AtomicBoolean(false);
+final Runnable fetchRackInformation = () -> {
+if (!rackInformationFetched.get()) {
+RackUtils.annotateTopicPartitionsWithRackInfo(cluster,

Review Comment:
   very small nit (tack onto any followup PR): weird line break, either keep 
everything on one line or move the `cluster` variable to the 2nd line with the 
other params



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -527,35 +531,44 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
   + "tasks for source topics vs 
changelog topics.");
 }
 
-final Set logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
-final Set allTopicPartitions = new 
HashSet<>();
+final Set topicsRequiringRackInfo = new 
HashSet<>();
+final AtomicBoolean rackInformationFetched = new AtomicBoolean(false);
+final Runnable fetchRackInformation = () -> {
+if (!rackInformationFetched.get()) {
+RackUtils.annotateTopicPartitionsWithRackInfo(cluster,
+internalTopicManager, topicsRequiringRackInfo);
+rackInformationFetched.set(true);
+}
+};
+
 final Map> topicPartitionsForTask = 
new HashMap<>();
+final Set logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
 logicalTaskIds.forEach(taskId -> {
 final Set topicPartitions = new HashSet<>();
 
 for (final TopicPartition topicPartition : 
sourcePartitionsForTask.get(taskId)) {
 final boolean isSource = true;
 final boolean isChangelog = 
changelogPartitionsForTask.get(taskId).contains(topicPartition);
 final DefaultTaskTopicPartition racklessTopicPartition = new 
DefaultTaskTopicPartition(
-topicPartition, isSource, isChangelog, null);
-allTopicPartitions.add(racklessTopicPartition);
+topicPartition, isSource, isChangelog, 
fetchRackInformation);
+topicsRequiringRackInfo.add(racklessTopicPartition);
 topicPartitions.add(racklessTopicPartition);
 }
 
 for (final TopicPartition topicPartition : 
changelogPartitionsForTask.get(taskId)) {
 final boolean isSource = 
sourcePartitionsForTask.get(taskId).contains(topicPartition);
 final boolean isChangelog = true;
 final DefaultTaskTopicPartition racklessTopicPartition = new 
DefaultTaskTopicPartition(
-topicPartition, isSource, isChangelog, null);
-allTopicPartitions.add(racklessTopicPartition);
+topicPartition, isSource, isChangelog, 
fetchRackInformation);
+if (publicAssignmentConfigs.numStandbyReplicas() > 0) {

Review Comment:
   Note that active tasks will also read from changelog topics (though only 
during the restore phase), so we should be adding changelogs to the 
`topicsRequiringRackInfo` set even if there are no standbys configured
   
   Again you can tack this onto PR #17 or whatever PR is next



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -573,14 +586,24 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
 ));
 
 return new DefaultApplicationState(
-assignmentConfigs.toPublicAssignmentConfigs(),
+publicAssignmentConfigs,
 logicalTasks,
 clientMetadataMap
 );
 }
 
-private static void processStreamsPartitionAssignment(final Map clientMetadataMap,
-  final TaskAssignment 
taskAssignment) {
+private void processStreamsPartitionAssignment(final 
org.apache.kafka.streams.processor.assignment.TaskAssignor assignor,
+   final TaskAssignment 
taskAssignment,
+   final AssignmentError 
assignmentError,
+  

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625034602


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+void testRequestManagersArePolledOnce() {
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());

Review Comment:
   This test seems to leave out one of the core actions of the `runOnce`:
   
   1. poll managers (this generates the requests) -> covered by the test
   2. add newly generated requests are added to the network client -> not 
covered
   3. poll network client to send the requests -> covered by the test
   
   Step 3 wouldn't have the effect we want (send the request) if step 2 does 
not happen, so what about adding a `verify(networkClientDelegate).addAll(..` 
before verifying the network client poll, to make sure we cover the whole flow 
of how requests move from the managers to the network layer on run once? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625032096


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+void testRequestManagersArePolledOnce() {
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;

Review Comment:
   I think we're missing the expectations for the requestManagers.entries() to 
make sure there are some managers and we actually verify something, so we need  
`when(requestManagers.entries()).thenReturn(..` with some managers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16105: Reset read offsets when seeking to beginning in TBRLMM [kafka]

2024-06-03 Thread via GitHub


gharris1727 merged PR #15165:
URL: https://github.com/apache/kafka/pull/15165


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16884) Refactor RemoteLogManagerConfig with a Builder

2024-06-03 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851809#comment-17851809
 ] 

Muralidhar Basani commented on KAFKA-16884:
---

[~gharris1727] agree with you about aligning it with other Config classes.

Like suggested, we aim to make the below changes :
 * Extend AbstractConfig instead of as an argument
 * Avoid use of mutable config def

> Refactor RemoteLogManagerConfig with a Builder
> --
>
> Key: KAFKA-16884
> URL: https://issues.apache.org/jira/browse/KAFKA-16884
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Muralidhar Basani
>Assignee: Muralidhar Basani
>Priority: Minor
>
> The {{RemoteLogManagerConfig}} class has a very large constructor with 27 
> fields, making it difficult to read and work with. Introducing a builder 
> class would be beneficial.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors, and compare raw task configs before publishing them [kafka]

2024-06-03 Thread via GitHub


C0urante commented on PR #16122:
URL: https://github.com/apache/kafka/pull/16122#issuecomment-2146088065

   Thanks @mimaison and apologies for the failing test. This should be ready 
now!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-06-03 Thread via GitHub


C0urante commented on PR #16053:
URL: https://github.com/apache/kafka/pull/16053#issuecomment-2146086988

   Thanks @gharris1727! I've updated the patch to apply to both distributed and 
standalone mode, and fixed failing unit tests.
   
   I've also tweaked the logic so that applied connector configs are tracked 
regardless of whether the worker has completed startup or not, which should 
eliminate unnecessary cluster churn on upgrades. This still "primes the pump" 
for future connector config updates though, which I believe should make us both 
very happy!
   
   Going through the scenarios you listed in your analysis (which was correct 
based on the implementation you reviewed):
   
   - The first one (steps 1-3) should never occur now. If a connector has 
generated task configs after its latest configuration was submitted by the 
user, the worker should have an applied config for it; if the connector has not 
generated task configs, then we should force a reconfiguration attempt, just 
like we would today.
   - The second one (steps 4-6) should remain correct ("Rebalancing a connector 
among existing workers in a cluster won't force new task configs to be 
generated"), because workers must be caught up on the config topic before 
joining a group, and before resuming work after rejoining a group
   - The third one (steps 7-8) should remain as correct as it was in the 
original implementation. We eagerly transform applied connector configs as soon 
as the corresponding set of task configs is generated, which does mean that 
there may be some lag time between when the connector config was submitted and 
when we performed the transform. Ultimately though, there is an eventual 
consistency-esque guarantee that if a config provider changes periodically, new 
task configs will at some point be forcibly written to the config topic.
   - The fourth one (steps 9-11) should remain correct as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-03 Thread via GitHub


jolshan commented on PR #16169:
URL: https://github.com/apache/kafka/pull/16169#issuecomment-2146083639

   Did we mean to totally remove the null check here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16105: Reassignment fix [kafka]

2024-06-03 Thread via GitHub


gharris1727 commented on PR #15165:
URL: https://github.com/apache/kafka/pull/15165#issuecomment-2146082430

   Test failures appear unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16884) Refactor RemoteLogManagerConfig with a Builder

2024-06-03 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851807#comment-17851807
 ] 

Greg Harris commented on KAFKA-16884:
-

I don't think the builder pattern is a good fit for configuration classes like 
this one, it would be a deviation in form from all other config classes.

I see two odd things about the RLMC that may contribute to this: it doesn't 
extend AbstractConfig, it instead accepts one as an argument. It exposes it's 
(mutable!) CONFIG_DEF to be defined directly in the KafkaConfig. I have not 
seen that before, it's very unusual, and mutable ConfigDefs have been phased 
out in Connect.

If the large constructor is truly a problem (which i'm not convinced of, as 
there are only 2 call-sites for it right now) then I think it would be 
reasonable to solve it by bringing the whole class into alignment with other 
Config classes, rather than switching to the builder pattern.

> Refactor RemoteLogManagerConfig with a Builder
> --
>
> Key: KAFKA-16884
> URL: https://issues.apache.org/jira/browse/KAFKA-16884
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Muralidhar Basani
>Assignee: Muralidhar Basani
>Priority: Minor
>
> The {{RemoteLogManagerConfig}} class has a very large constructor with 27 
> fields, making it difficult to read and work with. Introducing a builder 
> class would be beneficial.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]

2024-06-03 Thread via GitHub


cmccabe commented on code in PR #16058:
URL: https://github.com/apache/kafka/pull/16058#discussion_r1625013334


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -1080,4 +1083,39 @@ class ControllerApis(
 requestThrottleMs => new 
AssignReplicasToDirsResponse(reply.setThrottleTimeMs(requestThrottleMs)))
 }
   }
+
+  def handleAddRaftVoter(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+authHelper.authorizeClusterOperation(request, ALTER)
+val addRequest = request.body[AddRaftVoterRequest]
+raftManager.handleAddVoter(addRequest.data())
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+  new AddRaftVoterResponse(new AddRaftVoterResponseData().
+setThrottleTimeMs(requestThrottleMs).
+setErrorCode(0.toShort).
+setErrorMessage(null)))
+CompletableFuture.completedFuture[Unit](())

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]

2024-06-03 Thread via GitHub


cmccabe commented on code in PR #16058:
URL: https://github.com/apache/kafka/pull/16058#discussion_r1625013008


##
clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java:
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Locale;
+import java.util.Objects;
+
+/**
+ * An endpoint for a raft quorum voter.
+ */
+@InterfaceStability.Stable
+public class RaftVoterEndpoint {
+private final String name;
+private final String host;
+private final int port;
+private final String securityProtocol;

Review Comment:
   > Did you consider using o.a.k.c.Endpoints? This struct has a similar 
structure to Endpoints.
   
   `org.apache.kafka.common.Endpoint` is not intended to be used as part of the 
Admin Client API. That class drags in the non-public class 
`org.apache.kafka.common.security.auth.SecurityProtocol`. Additionally we may 
want to change it in the future, which we could not do if it were part of the 
admin API.
   
   (`org.apache.kafka.common.Endpoint` did get dragged into the Authorizer API, 
which I think was a mistake, but that's a separate discussion. And not many 
people have ever written Authorizers, so changing that stuff is not impossible)
   
   > During the KIP discussion we agreed to not add security protocol to the 
AddRaftVoter request. Mainly because it is not needed and that there is a 
requirement that the listener name, security protocol and the protocol 
configuration must match in all of the voters if they exist.
   
   Is the expectation that the security protocol is found by looking up the 
endpoint name in `listener.security.protocol.map`? What happens if it's not 
there? Do we fail or fall back to `PLAINTEXT`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]

2024-06-03 Thread via GitHub


cmccabe commented on code in PR #16058:
URL: https://github.com/apache/kafka/pull/16058#discussion_r1625003964


##
core/src/test/resources/log4j.properties:
##
@@ -18,8 +18,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=WARN
-log4j.logger.org.apache.kafka=WARN
+log4j.logger.kafka=OFF
+log4j.logger.org.apache.kafka=OFF

Review Comment:
   Sorry, will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16884) Refactor RemoteLogManagerConfig with a Builder

2024-06-03 Thread Muralidhar Basani (Jira)
Muralidhar Basani created KAFKA-16884:
-

 Summary: Refactor RemoteLogManagerConfig with a Builder
 Key: KAFKA-16884
 URL: https://issues.apache.org/jira/browse/KAFKA-16884
 Project: Kafka
  Issue Type: Improvement
Reporter: Muralidhar Basani
Assignee: Muralidhar Basani


The {{RemoteLogManagerConfig}} class has a very large constructor with 27 
fields, making it difficult to read and work with. Introducing a builder class 
would be beneficial.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]

2024-06-03 Thread via GitHub


cmccabe commented on code in PR #16058:
URL: https://github.com/apache/kafka/pull/16058#discussion_r1625003607


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -318,4 +329,28 @@ class KafkaRaftManager[T](
   override def voterNode(id: Int, listener: String): Option[Node] = {
 client.voterNode(id, listener).toScala
   }
+
+  def verifyCommonFields(
+providedClusterId: String,
+  ): Unit = {
+if (!providedClusterId.equals(clusterId)) {
+  throw new InconsistentClusterIdException("Provided cluster ID " + 
providedClusterId +

Review Comment:
   > If we throw here, this will get caught here and print an error message, 
right?
   
https://github.com/apache/kafka/pull/16058/files#diff-91060c918c99d25342f625c146f14425716eda9d8fcfe1126b2c45feff388362R152-R154
   
   Hmm, your URL doesn't work for me. As a general matter, exceptions not 
otherwise handled in ControllerApis.scala will just invoke 
`AbstractRequest.getErrorResponse`.
   
   > How about for now we just complete the requests with the unsupported 
version error responses?
   
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-06-03 Thread via GitHub


C0urante commented on PR #16001:
URL: https://github.com/apache/kafka/pull/16001#issuecomment-2146046642

   Closing in favor of https://github.com/apache/kafka/pull/16053, which is 
simpler, more effective, and requires no changes to the config topic or 
internal REST API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-06-03 Thread via GitHub


C0urante closed pull request #16001: KAFKA-9228: Restart tasks on runtime-only 
connector config changes
URL: https://github.com/apache/kafka/pull/16001


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624997659


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -747,21 +748,19 @@ public void 
testOffsetCommitSyncTimeoutNotReturnedOnPollAndFails() {
 new TopicPartition("topic", 1),
 new OffsetAndMetadata(0));
 
-// Send sync offset commit request that fails with retriable error.
-long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2;
-CompletableFuture commitResult = 
commitRequestManager.commitSync(offsets, expirationTimeMs);
-completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);
+// Send sync offset commit request.
+long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
+CompletableFuture commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
 
-// Request retried after backoff, and fails with retriable again. 
Should not complete yet
+// Make the first request fail with a retriable error. Should not 
complete yet
 // given that the request timeout hasn't expired.
 time.sleep(retryBackoffMs);
 completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);
 assertFalse(commitResult.isDone());
 
 // Sleep to expire the request timeout. Request should fail on the 
next poll.
 time.sleep(retryBackoffMs);
-NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
-assertEquals(0, res.unsentRequests.size());
+completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);

Review Comment:
   This code is reverted back to the original form now that we prune expired 
requests that have been attempted at least once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624996705


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -210,7 +194,7 @@ private NetworkClientDelegate.UnsentRequest 
createUnsentRequest(
 private void handleError(final Throwable exception,
  final long completionTimeMs) {
 if (exception instanceof RetriableException) {
-if (completionTimeMs >= expirationTimeMs) {
+if (isExpired()) {

Review Comment:
   I refactored the code so that `handleResponse()` calls `handleError()` if it 
hits an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]

2024-06-03 Thread via GitHub


cmccabe commented on code in PR #16058:
URL: https://github.com/apache/kafka/pull/16058#discussion_r1624993551


##
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##
@@ -1320,6 +1320,16 @@ public ListClientMetricsResourcesResult 
listClientMetricsResources(ListClientMet
 return new ListClientMetricsResourcesResult(future);
 }
 
+@Override
+public AddRaftVoterResult addRaftVoter(int voterId, Uuid voterDirectoryId, 
Set endpoints, AddRaftVoterOptions options) {
+throw new UnsupportedOperationException("Not implemented yet");
+}
+
+@Override
+public RemoveRaftVoterResult removeRaftVoter(int voterId, Uuid 
voterDirectoryId, RemoveRaftVoterOptions options) {
+throw new UnsupportedOperationException("Not implemented yet");
+}

Review Comment:
   Maybe not. But Java requires them to be in MockAdminClient so that it will 
typecheck.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624992826


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -61,30 +62,28 @@
  */
 
 public class TopicMetadataRequestManager implements RequestManager {
+private final Time time;
 private final boolean allowAutoTopicCreation;
 private final List inflightRequests;
+private final int requestTimeoutMs;
 private final long retryBackoffMs;
 private final long retryBackoffMaxMs;
 private final Logger log;
 private final LogContext logContext;
 
-public TopicMetadataRequestManager(final LogContext context, final 
ConsumerConfig config) {
+public TopicMetadataRequestManager(final LogContext context, final Time 
time, final ConsumerConfig config) {
 logContext = context;
 log = logContext.logger(getClass());
+this.time = time;
 inflightRequests = new LinkedList<>();
+requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
 retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
 allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
 }
 
 @Override
 public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-// Prune any requests which have timed out
-List expiredRequests = 
inflightRequests.stream()
-.filter(req -> req.isExpired(currentTimeMs))
-.collect(Collectors.toList());
-expiredRequests.forEach(TopicMetadataRequestState::expire);
-

Review Comment:
   I've updated the logic in `CommitRequestManager` and 
`TopicMetadataRequestManager` to proactively prune expired `RequestState`s that 
have had at least one request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]

2024-06-03 Thread via GitHub


cmccabe commented on code in PR #16058:
URL: https://github.com/apache/kafka/pull/16058#discussion_r1624991685


##
clients/src/main/resources/common/message/AddRaftVoterRequest.json:
##
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 80,
+  "type": "request",
+  "listeners": ["controller", "broker"],
+  "name": "AddRaftVoterRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ClusterId", "type": "string", "versions": "0+" },
+{ "name": "TimeoutMs", "type": "int32", "versions": "0+" },
+{ "name": "VoterId", "type": "int32", "versions": "0+",
+  "about": "The replica id of the voter getting added to the topic 
partition" },
+{ "name": "VoterDirectoryId", "type": "uuid", "versions": "0+",
+  "about": "The directory id of the voter getting added to the topic 
partition" },

Review Comment:
   Right. And it would be easy to update the RPCs to have an optional topic 
structure, if and when we need one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624957099


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState 
{
  */
 abstract CompletableFuture future();
 
-/**
- * Complete the request future with a TimeoutException if the request 
timeout has been
- * reached, based on the provided current time.
- */
-void maybeExpire(long currentTimeMs) {
-if (retryTimeoutExpired(currentTimeMs)) {
-removeRequest();
-isExpired = true;
-future().completeExceptionally(new 
TimeoutException(requestDescription() +
-" could not complete before timeout expired."));
-}
-}
-
 /**
  * Build request with the given builder, including response handling 
logic.
  */
 NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final AbstractRequest.Builder builder) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator());
+coordinatorRequestManager.coordinator(),
+time.timer(requestTimeoutMs)
+);
 request.whenComplete(
 (response, throwable) -> {
 long currentTimeMs = request.handler().completionTimeMs();

Review Comment:
   agree that it's confusing, but for the record, I guess the current in the 
name may come from the point of view that here's the moment a request 
completes, and we retrieve the completion time, so could I could see it as the 
current because of where it's called (but still +1 for better name, simply 
`completionTimeMs`, that btw aligns with the `handleClientResponse` func param 
where it's used right below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16530: Fix high-watermark calculation to not assume the leader is in the voter set [kafka]

2024-06-03 Thread via GitHub


jsancio commented on code in PR #16079:
URL: https://github.com/apache/kafka/pull/16079#discussion_r1624780684


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -341,9 +346,18 @@ public boolean updateReplicaState(
 state.nodeId, currentEndOffset.offset, 
fetchOffsetMetadata.offset);
 }
 });
-
-Optional leaderEndOffsetOpt =
-voterStates.get(localId).endOffset;
+Optional leaderEndOffsetOpt;
+ReplicaState leaderVoterState = voterStates.get(localId);
+ReplicaState leaderObserverState = observerStates.get(localId);
+if (leaderVoterState != null) {
+leaderEndOffsetOpt = leaderVoterState.endOffset;
+} else if (leaderObserverState != null) {
+// The leader is not guaranteed to be in the voter set when in the 
process of being removed from the quorum.
+log.info("Updating end offset for leader {} which is also an 
observer.", localId);
+leaderEndOffsetOpt = leaderObserverState.endOffset;
+} else {
+throw new IllegalStateException("Leader state not found for 
localId " + localId);
+}

Review Comment:
   In practice isn't this the same as `getOrCreateReplicaState` so we should 
just use that function here.



##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -341,9 +346,18 @@ public boolean updateReplicaState(
 state.nodeId, currentEndOffset.offset, 
fetchOffsetMetadata.offset);
 }
 });
-
-Optional leaderEndOffsetOpt =
-voterStates.get(localId).endOffset;
+Optional leaderEndOffsetOpt;

Review Comment:
   If you move this right before the `if` statement and mark it as final it 
makes it more obvious that this variable can take two different values.



##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -445,6 +463,27 @@ private boolean isVoter(int remoteNodeId) {
 return voterStates.containsKey(remoteNodeId);
 }
 
+// with Jose's changes this will probably make more sense as VoterSet
+private void updateVoterSet(Set lastVoterSet) {
+// Remove any voter state that is not in the last voter set. They 
become observers.
+for (Iterator> iter = 
voterStates.entrySet().iterator(); iter.hasNext(); ) {
+Integer nodeId = iter.next().getKey();
+if (!lastVoterSet.contains(nodeId)) {
+createObserverState(nodeId);
+iter.remove();
+}
+}
+
+// Add any voter state that is in the last voter set but not in the 
current voter set. They are removed from
+// the observerStates if they exist.
+for (int voterId : lastVoterSet) {
+if (!voterStates.containsKey(voterId)) {
+voterStates.put(voterId, new ReplicaState(voterId, false));
+observerStates.remove(voterId);

Review Comment:
   This comment applies to this code block and the one above.
   
   When moving a replica state from voter to observer and vice versa, we 
shouldn't create a new replica state but instead reuse the replica state for 
the previous hash map.



##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -445,6 +463,27 @@ private boolean isVoter(int remoteNodeId) {
 return voterStates.containsKey(remoteNodeId);
 }
 
+// with Jose's changes this will probably make more sense as VoterSet
+private void updateVoterSet(Set lastVoterSet) {
+// Remove any voter state that is not in the last voter set. They 
become observers.
+for (Iterator> iter = 
voterStates.entrySet().iterator(); iter.hasNext(); ) {
+Integer nodeId = iter.next().getKey();
+if (!lastVoterSet.contains(nodeId)) {
+createObserverState(nodeId);

Review Comment:
   The code in `clearInactiveObservers` removes any `ReplicaState` in 
`observerStates` that hasn't been updated after some time. We should make sure 
that the local replica (the leader) is not remove from `observerStates` if it 
exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs [kafka]

2024-06-03 Thread via GitHub


jsancio commented on code in PR #16058:
URL: https://github.com/apache/kafka/pull/16058#discussion_r1624911276


##
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##
@@ -1320,6 +1320,16 @@ public ListClientMetricsResourcesResult 
listClientMetricsResources(ListClientMet
 return new ListClientMetricsResourcesResult(future);
 }
 
+@Override
+public AddRaftVoterResult addRaftVoter(int voterId, Uuid voterDirectoryId, 
Set endpoints, AddRaftVoterOptions options) {
+throw new UnsupportedOperationException("Not implemented yet");
+}
+
+@Override
+public RemoveRaftVoterResult removeRaftVoter(int voterId, Uuid 
voterDirectoryId, RemoveRaftVoterOptions options) {
+throw new UnsupportedOperationException("Not implemented yet");
+}

Review Comment:
   I suspect that we will never implement these. Do you agree?



##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -1080,4 +1083,39 @@ class ControllerApis(
 requestThrottleMs => new 
AssignReplicasToDirsResponse(reply.setThrottleTimeMs(requestThrottleMs)))
 }
   }
+
+  def handleAddRaftVoter(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+authHelper.authorizeClusterOperation(request, ALTER)
+val addRequest = request.body[AddRaftVoterRequest]
+raftManager.handleAddVoter(addRequest.data())
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+  new AddRaftVoterResponse(new AddRaftVoterResponseData().
+setThrottleTimeMs(requestThrottleMs).
+setErrorCode(0.toShort).
+setErrorMessage(null)))
+CompletableFuture.completedFuture[Unit](())

Review Comment:
   I think this is going to change in the real implementation. For KRaft 
requests we simply forward them to the raft thread/actor using 
`handleRaftRequest` and `raftManager.handleRequest`. 
   
   How about for now we just send an error response with "unsupported version" 
and return a completed future?
   
   This comment applies to all of the new methods added to this type.



##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -318,4 +329,28 @@ class KafkaRaftManager[T](
   override def voterNode(id: Int, listener: String): Option[Node] = {
 client.voterNode(id, listener).toScala
   }
+
+  def verifyCommonFields(
+providedClusterId: String,
+  ): Unit = {
+if (!providedClusterId.equals(clusterId)) {
+  throw new InconsistentClusterIdException("Provided cluster ID " + 
providedClusterId +

Review Comment:
   This applies to all of the `throw new` added.
   
   If we throw here, this will get caught here and print an error message, 
right?
   
https://github.com/apache/kafka/pull/16058/files#diff-91060c918c99d25342f625c146f14425716eda9d8fcfe1126b2c45feff388362R152-R154
   
   How about for now we just complete the requests with the unsupported version 
error responses?
   
   Also, I have to double check but I think cluster ids are check by the 
`KafkaRaftClient` and not the `KafkaRaftManager`. We have this abstraction so 
we can test that behavior in the `KafkaRaftClientTest` suite or if we add a new 
test suite for this functionality.
   
   Most our interesting tests are against `KafkaRaftClient`. The tests for 
`KafkaRaftManager` are more basic and limited.



##
clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java:
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Locale;
+import java.util.Objects;
+
+/**
+ * An endpoint for a raft quorum voter.
+ */
+@InterfaceStability.Stable
+public class RaftVoterEndpoint {
+private final String name;
+private final String host;
+private final int port;
+private final String securityProtocol;

Review Comment:
   Did you consider using `o.a.k.c.Endpoints`? This struct has a similar 
structure to `Endpoints`.
   
   During the KIP discussion we agreed to not add security protocol to the 
`AddRaftVoter` request. Mainly because it is not needed and that there is a 
requirement that 

Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624788878


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -235,7 +235,6 @@ public void addAll(final List requests) {
 
 public void add(final UnsentRequest r) {
 Objects.requireNonNull(r);
-r.setTimer(this.time, this.requestTimeoutMs);

Review Comment:
   It _shouldn't_. My preference is to make data as immutable as possible. 
Setting the `Timer` on creation vs. after the fact seems cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Use project path instead of name for spotlessApplyModules in Gradle script [kafka]

2024-06-03 Thread via GitHub


C0urante merged PR #16177:
URL: https://github.com/apache/kafka/pull/16177


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: update / add documentations stream-state-metrics [kafka]

2024-06-03 Thread via GitHub


sebastienviale opened a new pull request, #16182:
URL: https://github.com/apache/kafka/pull/16182

   This change update the documentation for the all-latency-max, 
range-latency-avg, range-latency-max and add the documentation for prefix-scan. 
   
   It specifies that the execution time of the all operation is in fact the 
time the iterator is in use.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16820) Kafka Broker fails to connect to Kraft Controller with no DNS matching

2024-06-03 Thread Arushi Helms (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851781#comment-17851781
 ] 

Arushi Helms commented on KAFKA-16820:
--

Hi [~soarez] 
Do you have any update on this? 

> Kafka Broker fails to connect to Kraft Controller with no DNS matching 
> ---
>
> Key: KAFKA-16820
> URL: https://issues.apache.org/jira/browse/KAFKA-16820
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.7.0, 3.6.1, 3.8.0
>Reporter: Arushi Helms
>Priority: Major
> Attachments: Screenshot 2024-05-22 at 1.09.11 PM-1.png
>
>
>  
> We are migrating our Kafka cluster from zookeeper to Kraft mode. We are 
> running individual brokers and controllers with TLS enabled and IPs are given 
> for communication. 
> TLS enabled setup works fine among the brokers and the certificate looks 
> something like:
> {noformat}
> Common Name: *.kafka.service.consul
> Subject Alternative Names: *.kafka.service.consul, IP 
> Address:10.87.170.78{noformat}
> Note:
>  * The DNS name for the node does not match the CN but since we are using IPs 
> as communication, we have provided IPs as SAN.
>  * Same with the controllers, IPs are given as SAN in the certificate.
>  * Issue is not related to the migration so just sharing configuration 
> relevant for the TLS piece. 
> In the current setup I am running 3 brokers and 3 controllers. 
> *CONTROLLER:*
> Relevant controller configurations from one of the controllers:
> {noformat}
> KAFKA_CFG_PROCESS_ROLES=controller 
> KAFKA_KRAFT_CLUSTER_ID=5kztjhJ4SxSu-kdiEYDUow
> KAFKA_CFG_NODE_ID=6 
> KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097
>  
> KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER 
> KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL
> KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SSL,INSIDE_SSL:SSL 
> KAFKA_CFG_LISTENERS=CONTROLLER://10.87.170.6:9097{noformat}
> Controller certificate has:
> {noformat}
> Common Name: *.kafka.service.consul 
> Subject Alternative Names: *.kafka.service.consul, IP 
> Address:10.87.170.6{noformat}
>  
> *BROKER:*
> Relevant broker configuration from one of the brokers:
> {noformat}
> KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
> KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL
> KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097
>  
> KAFKA_CFG_PROCESS_ROLES=broker 
> KAFKA_CFG_NODE_ID=3 
> KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE_SSL:SSL,OUTSIDE_SSL:SSL,CONTROLLER:SSL
>  
> KAFKA_CFG_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096
>  
> KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096{noformat}
> Broker certificate has:
> {noformat}
> Common Name: *.kafka.service.consul
> Subject Alternative Names: *.kafka.service.consul, IP 
> Address:10.87.170.78{noformat}
>  
> ISSUE 1: 
> With this setup Kafka broker is failing to connect to the controller, see the 
> following error:
> {noformat}
> 2024-05-22 17:53:46,413] ERROR 
> [broker-2-to-controller-heartbeat-channel-manager]: Request 
> BrokerRegistrationRequestData(brokerId=2, clusterId='5kztjhJ4SxSu-kdiEYDUow', 
> incarnationId=7741fgH6T4SQqGsho8E6mw, listeners=[Listener(name='INSIDE_SSL', 
> host='10.87.170.81', port=9093, securityProtocol=1), Listener(name='INSIDE', 
> host='10.87.170.81', port=9094, securityProtocol=0), Listener(name='OUTSIDE', 
> host='10.87.170.81', port=9092, securityProtocol=0), 
> Listener(name='OUTSIDE_SSL', host='10.87.170.81', port=9096, 
> securityProtocol=1)], features=[Feature(name='metadata.version', 
> minSupportedVersion=1, maxSupportedVersion=19)], rack=null, 
> isMigratingZkBroker=false, logDirs=[TJssfKDD-iBFYfIYCKOcew], 
> previousBrokerEpoch=-1) failed due to authentication error with controller 
> (kafka.server.NodeToControllerRequestThread)org.apache.kafka.common.errors.SslAuthenticationException:
>  SSL handshake failedCaused by: javax.net.ssl.SSLHandshakeException: No 
> subject alternative DNS name matching 
> cp-internal-onecloud-kfkc1.node.cp-internal-onecloud.consul found.at 
> java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)  at 
> java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:378) 
> at 
> java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321) 
> at 
> java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:316) 
> at 
> java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1351)
>   at 
> java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1226)
>   at 
> 

Re: [PR] KAFKA-16861: Don't convert to group to classic if the size is larger than group max size. [kafka]

2024-06-03 Thread via GitHub


dajac commented on PR #16163:
URL: https://github.com/apache/kafka/pull/16163#issuecomment-2145877889

   Merged to trunk and to 3.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Small refactor in TargetAssignmentBuilder [kafka]

2024-06-03 Thread via GitHub


dajac commented on PR #16174:
URL: https://github.com/apache/kafka/pull/16174#issuecomment-2145877229

   Merged to trunk and to 3.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   >