Re: [PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on code in PR #15911:
URL: https://github.com/apache/kafka/pull/15911#discussion_r1596287627


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java:
##
@@ -49,9 +45,10 @@ public class RemoteLogMetadataManagerTest {
 
 private final Time time = new MockTime(1);
 
-@ParameterizedTest(name = "remoteLogMetadataManager = {0}")
-@MethodSource("remoteLogMetadataManagers")
-public void testFetchSegments(RemoteLogMetadataManager 
remoteLogMetadataManager) throws Exception {
+private RemoteLogMetadataManager remoteLogMetadataManager = new 
TopicBasedRemoteLogMetadataManagerWrapperWithHarness();
+
+@Test
+public void testFetchSegments() throws Exception {
 try {

Review Comment:
   It seems to me `TopicBasedRemoteLogMetadataManagerWrapperWithHarness` can be 
removed also. We don't use the wrapper actually. This test can be modified by 
following style:
   
   ```java
   @Test
   public void testFetchSegments() throws Exception {
   try (TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness()) {
   RemoteLogMetadataManager remoteLogMetadataManager = 
remoteLogMetadataManagerHarness.remoteLogMetadataManager();
   ```
   
   noted `TopicBasedRemoteLogMetadataManagerHarness` needs to implement 
`AutoClosable`, and `remoteLogMetadataManager` should be a public method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-09 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2103912972

   While going through the usages, it looks to me that the LogOffsetMetadata 
conversion happens in the KafkaMetadataLog is not correct. Could someone please 
double check?
   
   ```
   org.apache.kafka.storage.internals.log.LogOffsetMetadata -> 
org.apache.kafka.raft.LogOffsetMetadata
   ```
   
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala#L226
   
   Question: Why do we make the  
org.apache.kafka.raft.LogOffsetMetadata#segmentPosition as empty when 
hwm.messageOffsetOnly is false?
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1596279690


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##
@@ -415,153 +474,246 @@ public void testResetOffsetsExportImportPlan() throws 
Exception {
 TopicPartition t1p1 = new TopicPartition(topic1, 1);
 TopicPartition t2p0 = new TopicPartition(topic2, 0);
 TopicPartition t2p1 = new TopicPartition(topic2, 1);
-createTopic(topic1, 2, 1, new Properties(), listenerName(), new 
Properties());
-createTopic(topic2, 2, 1, new Properties(), listenerName(), new 
Properties());
+String[] cgcArgs = buildArgsForGroups(cluster, asList(group1, group2),
+"--all-topics", "--to-offset", "2", "--export");
+File file = TestUtils.tempFile("reset", ".csv");
+// Multiple --group's offset import
+String[] cgcArgsExec = buildArgsForGroups(cluster, asList(group1, 
group2),
+"--all-topics",
+"--from-file", file.getCanonicalPath(), "--dry-run");
+// Single --group offset import using "group,topic,partition,offset" 
csv format
+String[] cgcArgsExec2 = buildArgsForGroup(cluster, group1, 
"--all-topics",
+"--from-file", file.getCanonicalPath(), "--dry-run");
 
-String[] cgcArgs = buildArgsForGroups(Arrays.asList(group1, group2), 
"--all-topics", "--to-offset", "2", "--export");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = 
getConsumerGroupService(cgcArgs);
+try (Admin admin = cluster.createAdminClient();
+ ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(file));
+ ConsumerGroupCommand.ConsumerGroupService serviceExec = 
getConsumerGroupService(cgcArgsExec);
+ ConsumerGroupCommand.ConsumerGroupService serviceExec2 = 
getConsumerGroupService(cgcArgsExec2)) {
 
-produceConsumeAndShutdown(topic1, group1, 100, 1);
-produceConsumeAndShutdown(topic2, group2, 100, 1);
+admin.createTopics(asList(new NewTopic(topic1, 2, (short) 1),
+new NewTopic(topic2, 2, (short) 1))).all().get();
 
-awaitConsumerGroupInactive(consumerGroupCommand, group1);
-awaitConsumerGroupInactive(consumerGroupCommand, group2);
+produceConsumeAndShutdown(cluster, topic1, group1, 1);
+produceConsumeAndShutdown(cluster, topic2, group2, 1);
 
-File file = TestUtils.tempFile("reset", ".csv");
+awaitConsumerGroupInactive(service, group1);
+awaitConsumerGroupInactive(service, group2);
 
-Map> exportedOffsets = 
consumerGroupCommand.resetOffsets();
-BufferedWriter bw = new BufferedWriter(new FileWriter(file));
-bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets));
-bw.close();
-Map exp1 = new HashMap<>();
-exp1.put(t1p0, 2L);
-exp1.put(t1p1, 2L);
-Map exp2 = new HashMap<>();
-exp2.put(t2p0, 2L);
-exp2.put(t2p1, 2L);
+Map> 
exportedOffsets = service.resetOffsets();
+bw.write(service.exportOffsetsToCsv(exportedOffsets));
+Map exp1 = new HashMap<>();
+exp1.put(t1p0, 2L);
+exp1.put(t1p1, 2L);
+Map exp2 = new HashMap<>();
+exp2.put(t2p0, 2L);
+exp2.put(t2p1, 2L);
 
-assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1)));
-assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2)));
+assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1)));
+assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2)));
 
-// Multiple --group's offset import
-String[] cgcArgsExec = buildArgsForGroups(Arrays.asList(group1, 
group2), "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec = 
getConsumerGroupService(cgcArgsExec);
-Map> importedOffsets = 
consumerGroupCommandExec.resetOffsets();
-assertEquals(exp1, toOffsetMap(importedOffsets.get(group1)));
-assertEquals(exp2, toOffsetMap(importedOffsets.get(group2)));
+Map> 
importedOffsets = serviceExec.resetOffsets();
+assertEquals(exp1, toOffsetMap(importedOffsets.get(group1)));
+assertEquals(exp2, toOffsetMap(importedOffsets.get(group2)));
 
-// Single --group offset import using "group,topic,partition,offset" 
csv format
-String[] cgcArgsExec2 = buildArgsForGroup(group1, "--all-topics", 
"--from-file", file.getCanonicalPath(), "--dry-run");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec2 = 
getConsumerGroupService(cgcArgsExec2);
-Map> importedOffsets2 = 
consumerGroupCommandExec2.resetOffsets();

Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-09 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1596278649


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Removed the `checkLogStartOffset` from the `convertToOffsetMetadataOrThrow` 
method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-09 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1596276757


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   This is a potential loop (not sure when it would be triggered), updated the 
logic to return the message-only metadata. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates [kafka]

2024-05-09 Thread via GitHub


appchemist commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580562449


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##
@@ -326,22 +326,34 @@ private void handleInitializeErrors(final CompletedFetch 
completedFetch, final E
 final TopicPartition tp = completedFetch.partition;
 final long fetchOffset = completedFetch.nextFetchOffset();
 
-if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
-error == Errors.REPLICA_NOT_AVAILABLE ||
-error == Errors.KAFKA_STORAGE_ERROR ||
-error == Errors.FENCED_LEADER_EPOCH ||
+if (error == Errors.REPLICA_NOT_AVAILABLE) {
+log.debug("Received replica not available error in fetch for 
partition {}", tp);

Review Comment:
   @kirktrue It's just a debug log, but it's different from the previous log. 
Is that okay?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
 if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
 partitionsWithUpdatedLeaderInfo.put(partition, new 
Metadata.LeaderIdAndEpoch(
 
Optional.of(partitionData.currentLeader().leaderId()), 
Optional.of(partitionData.currentLeader().leaderEpoch(;
+} else {
+requestMetadataUpdate(metadata, subscriptions, 
partition);
+subscriptions.awaitUpdate(partition);

Review Comment:
   changed



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##
@@ -327,21 +327,27 @@ private void handleInitializeErrors(final CompletedFetch 
completedFetch, final E
 final long fetchOffset = completedFetch.nextFetchOffset();
 
 if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
-error == Errors.REPLICA_NOT_AVAILABLE ||
+error == Errors.FENCED_LEADER_EPOCH) {
+log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
+requestMetadataUpdate(metadata, subscriptions, tp);
+} else if (error == Errors.REPLICA_NOT_AVAILABLE ||
 error == Errors.KAFKA_STORAGE_ERROR ||
-error == Errors.FENCED_LEADER_EPOCH ||
 error == Errors.OFFSET_NOT_AVAILABLE) {
 log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
 requestMetadataUpdate(metadata, subscriptions, tp);
+subscriptions.awaitUpdate(tp);

Review Comment:
   changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14885: fix kafka client connect to the broker that offline from… [kafka]

2024-05-09 Thread via GitHub


Stephan14 commented on PR #13531:
URL: https://github.com/apache/kafka/pull/13531#issuecomment-2103820670

   Hi @divijvaidya, Can you help me to review it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on code in PR #15862:
URL: https://github.com/apache/kafka/pull/15862#discussion_r1596214008


##
core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java:
##
@@ -18,31 +18,53 @@
 package kafka.test.junit;
 
 import kafka.test.annotation.ClusterTemplate;
+import kafka.test.ClusterGenerator;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
-import java.util.function.Consumer;
+import java.lang.reflect.Method;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class ClusterTestExtensionsUnitTest {
+
+static class StubTest {
+@ClusterTemplate("cfgFoo")
+void testFoo() {}
+static void cfgFoo(ClusterGenerator gen) { /* ... */ }
+
+@ClusterTemplate("")
+void testBar() {}
+
+};
+
+private ExtensionContext buildExtensionContext(String methodName) throws 
Exception {

Review Comment:
   please add `@SuppressWarnings({"unchecked", "rawtypes"})`



##
core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java:
##
@@ -18,31 +18,53 @@
 package kafka.test.junit;
 
 import kafka.test.annotation.ClusterTemplate;
+import kafka.test.ClusterGenerator;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
-import java.util.function.Consumer;
+import java.lang.reflect.Method;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class ClusterTestExtensionsUnitTest {
+
+static class StubTest {
+@ClusterTemplate("cfgFoo")
+void testFoo() {}
+static void cfgFoo(ClusterGenerator gen) { /* ... */ }
+
+@ClusterTemplate("")
+void testBar() {}
+
+};
+
+private ExtensionContext buildExtensionContext(String methodName) throws 
Exception {
+ExtensionContext extensionContext = mock(ExtensionContext.class);
+Class clazz = StubTest.class;
+Method method = clazz.getDeclaredMethod(methodName);
+when(extensionContext.getRequiredTestClass()).thenReturn(clazz);
+when(extensionContext.getRequiredTestMethod()).thenReturn(method);
+return extensionContext;
+}
 @Test
 @SuppressWarnings("unchecked")

Review Comment:
   this is unused now.



##
core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java:
##
@@ -18,31 +18,53 @@
 package kafka.test.junit;
 
 import kafka.test.annotation.ClusterTemplate;
+import kafka.test.ClusterGenerator;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
-import java.util.function.Consumer;
+import java.lang.reflect.Method;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class ClusterTestExtensionsUnitTest {
+
+static class StubTest {
+@ClusterTemplate("cfgFoo")
+void testFoo() {}
+static void cfgFoo(ClusterGenerator gen) { /* ... */ }
+
+@ClusterTemplate("")
+void testBar() {}
+
+};
+
+private ExtensionContext buildExtensionContext(String methodName) throws 
Exception {
+ExtensionContext extensionContext = mock(ExtensionContext.class);
+Class clazz = StubTest.class;
+Method method = clazz.getDeclaredMethod(methodName);
+when(extensionContext.getRequiredTestClass()).thenReturn(clazz);
+when(extensionContext.getRequiredTestMethod()).thenReturn(method);
+return extensionContext;
+}
 @Test
 @SuppressWarnings("unchecked")
 void testProcessClusterTemplate() {
-ClusterTestExtensions ext = new ClusterTestExtensions();
-ExtensionContext context = mock(ExtensionContext.class);
-Consumer testInvocations = 
mock(Consumer.class);
-ClusterTemplate annot = mock(ClusterTemplate.class);
-when(annot.value()).thenReturn("").thenReturn(" ");
-
-Assertions.assertThrows(IllegalStateException.class, () ->
-ext.processClusterTemplate(context, annot, testInvocations)
+ClusterTestExtensions clusterTestExtensions = new 
ClusterTestExtensions();
+
+assertEquals("ClusterConfig generator method should provide at least 
one config.",

[jira] [Resolved] (KAFKA-12947) Replace EasyMock and PowerMock with Mockito for StreamsMetricsImplTest ...

2024-05-09 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12947.

Resolution: Duplicate

this is fixed by https://github.com/apache/kafka/pull/14623

> Replace EasyMock and PowerMock with Mockito for StreamsMetricsImplTest ...
> --
>
> Key: KAFKA-12947
> URL: https://issues.apache.org/jira/browse/KAFKA-12947
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: YI-CHEN WANG
>Assignee: Dalibor Plavcic
>Priority: Major
>
> For Kafka-7438



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-05-09 Thread via GitHub


chia7712 merged PR #14716:
URL: https://github.com/apache/kafka/pull/14716


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1596196298


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -92,6 +94,7 @@ static void generator(ClusterGenerator clusterGenerator) {
 static  AutoCloseable buildConsumers(int numberOfConsumers,

Review Comment:
   Could we have two `buildConsumers` to deal with "assign"/"subscribe" 
individually?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16643 Fix chaos modifier [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on PR #15890:
URL: https://github.com/apache/kafka/pull/15890#issuecomment-2103767353

   > But I'm confused. Is there anything I can do about it
   
   @dongjinleekr had filed a PR (#10428) to be the start. However, the PR gets 
conflicts now, and not sure whether @dongjinleekr has free cycles to address 
it. You can file a new PR to address that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Validate at least one control record [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on code in PR #15912:
URL: https://github.com/apache/kafka/pull/15912#discussion_r1596192617


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##
@@ -284,6 +284,8 @@ private int 
validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) {
 );
 } else if (numberOfRecords == null) {
 throw new IllegalArgumentException("valueCreator didn't create a 
batch with the count");
+} else if (numberOfRecords < 1) {

Review Comment:
   Is it possible to add new UT to `BatchAccumulatorTest` for this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-09 Thread via GitHub


chickenchickenlove commented on PR #15573:
URL: https://github.com/apache/kafka/pull/15573#issuecomment-2103761393

   Thanks for your guideline.
   It was very helpful to me ‍♂️‍♂️‍♂️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]

2024-05-09 Thread via GitHub


mjsax commented on PR #14360:
URL: https://github.com/apache/kafka/pull/14360#issuecomment-2103760848

   Thanks for the PR @Cerchie! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]

2024-05-09 Thread via GitHub


mjsax merged PR #14360:
URL: https://github.com/apache/kafka/pull/14360


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]

2024-05-09 Thread via GitHub


mjsax commented on code in PR #14360:
URL: https://github.com/apache/kafka/pull/14360#discussion_r1596187852


##
docs/streams/developer-guide/config-streams.html:
##
@@ -300,12 +306,12 @@ num.standby.replicasnull
   
-  default.windowed.key.serde.inner
+  default.windowed.key.serde.inner 
(Deprecated. Use windowed.inner.class.serde instead.)
 Medium
 Default serializer/deserializer for the inner 
class of windowed keys, implementing the Serde interface.
 null
   
-  default.windowed.value.serde.inner
+  default.windowed.value.serde.inner 
(Deprecated. Use windowed.inner.class.serde instead.)
 Medium
 Default serializer/deserializer for the inner 
class of windowed values, implementing the Serde interface.
 null

Review Comment:
   I just see by change, that we use a different formatting for marking a 
deprecated config:
   ```
   default.dsl.store
 Low
 
   DEPRECATED] The default state store type used by DSL operators. 
Deprecated in
   favor of dsl.store.suppliers.class
   
   ```
   
   I think we should use a unified formatting -- don't have a preference which 
one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-09 Thread via GitHub


mjsax commented on PR #15573:
URL: https://github.com/apache/kafka/pull/15573#issuecomment-2103753531

   Thanks for the PR @chickenchickenlove! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16688: Use helper method to shutdown ExecutorService [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on PR #15886:
URL: https://github.com/apache/kafka/pull/15886#issuecomment-2103753542

   agree to @soarez observation. I feel this patch can get merged to save all 
Kafka developers life. #15891 can revert this patch to observe which timer task 
gets hanging. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-09 Thread via GitHub


mjsax merged PR #15573:
URL: https://github.com/apache/kafka/pull/15573


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16484) Support to define per broker/controller property by ClusterConfigProperty

2024-05-09 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16484.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Support to define per broker/controller property by ClusterConfigProperty
> -
>
> Key: KAFKA-16484
> URL: https://issues.apache.org/jira/browse/KAFKA-16484
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Major
> Fix For: 3.8.0
>
>
> the property set to `ClusterConfigProperty` gets applied to all brokers, and 
> hence we can't have individual props for each broker to test racks.
>  
> It seems to me we can add new field "id" to `ClusterConfigProperty` to 
> declare the property should be applied to specific broker (or controller). 
> the default value is -1 and it should be applied to all nodes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-05-09 Thread via GitHub


chia7712 merged PR #15715:
URL: https://github.com/apache/kafka/pull/15715


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-09 Thread via GitHub


mjsax commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1596180605


##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.StateSerdes;
+
+
+public class StoreSerdeInitializer {
+static  StateSerdes prepareStoreSerde(final StateStoreContext 
context, final String storeName,
+  final String 
changelogTopic, final Serde keySerde,

Review Comment:
   nit formatting. We should have a single parameter per line, not multiple 
(both line above) -- also below



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));

Review Comment:
   Should we somehow preserve `e.getMessage()` -- it seems useful?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());

Review Comment:
   I did dig into `prepareKeySerializer` and `prepareValueSerializer` which 
both use `WrappingNullableUtils#prepareSerializer()` which might call both 
`context.keySerde()` and `context.valueSerde()`, and thus, I believe we could 
currently get an exception when trying to get the key serde, even if default 
key serde is set, but default value serde is not set?
   
   I think this internal helper method needs some updated, too.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   Why are we catching `StreamsException`?  Seems the only exception that might 
bubble up her is a `ConfigException`?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##
@@ -156,8 +156,7 @@ public void testTopologyLevelConfigException() {
 
 final ConfigException se = 

Re: [PR] KAFKA-16643 Fix chaos modifier [kafka]

2024-05-09 Thread via GitHub


gongxuanzhang commented on PR #15890:
URL: https://github.com/apache/kafka/pull/15890#issuecomment-2103743672

   @chia7712 
   get it!
   But I'm confused. Is there anything I can do about it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on code in PR #15897:
URL: https://github.com/apache/kafka/pull/15897#discussion_r1596178459


##
core/src/test/java/kafka/test/annotation/ClusterTest.java:
##
@@ -33,7 +33,7 @@
 @Retention(RUNTIME)
 @TestTemplate
 public @interface ClusterTest {
-Type clusterType() default Type.DEFAULT;
+Type[] clusterTypes() default {};

Review Comment:
   > so wonder if having the full list as default here would be a good 
complementary change?
   > It would mean that we have well defined ClusterTypes as introduced by this 
PR, but every test running on ClusterTest would run for all types, unless 
specified. That removes lots of clusterTypes = {Type.ZK, Type.KRAFT, 
Type.CO_KRAFT}, it's probably the default behaviour we want for a test so that 
no ones accidentally forgets to run on any specific cluster type, and makes it 
easier to maintain (keeping the full list only in this single place, not on 
every test). Thoughts?
   
   I love this idea!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on code in PR #15897:
URL: https://github.com/apache/kafka/pull/15897#discussion_r1596176943


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -219,8 +221,8 @@ public static class Builder {
 
 private Builder() {}
 
-public Builder setType(Type type) {
-this.type = type;
+public Builder setTypes(Set types) {
+this.types = Collections.unmodifiableSet(new HashSet<>(types));

Review Comment:
   > so why creating a new HashSet and not simply 
Collections.unmodifiableSet(types)?
   
   We are trying to make `ClusterConfig` immutable. Without copy, users is able 
to changes the `ClusterConfig`'s `types` by modifying the input `types`. We can 
simplify the code by `Set.copy` after removing the support of JDK8



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16643 Fix chaos modifier [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on PR #15890:
URL: https://github.com/apache/kafka/pull/15890#issuecomment-2103727012

   @gongxuanzhang This is still a huge patch, which contains a log of changes 
across all modules. This is a important improvement which can impact all Kafka 
developers, and so we should separate it to small PRs.
   
   Personally, the first patch should include 1) new checkstyle and 2) auto 
formatter ( see https://issues.apache.org/jira/browse/KAFKA-12572)
   
   After first patch gets merged, we can apply the rule to all modules "one by 
one".  Yes, it needs many PRs but they make reviews workable.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924) New interfaces and stubbed utility classes for pluggable TaskAssignors. [kafka]

2024-05-09 Thread via GitHub


ableegoldman commented on PR #15887:
URL: https://github.com/apache/kafka/pull/15887#issuecomment-2103726299

   Test failures were unrelated. Merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924) New interfaces and stubbed utility classes for pluggable TaskAssignors. [kafka]

2024-05-09 Thread via GitHub


ableegoldman merged PR #15887:
URL: https://github.com/apache/kafka/pull/15887


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix LogValidatorTest#checkNonCompressed [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on PR #15904:
URL: https://github.com/apache/kafka/pull/15904#issuecomment-2103720759

   @junrao thanks for reviewing this rough patch :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16643 Fix chaos modifier [kafka]

2024-05-09 Thread via GitHub


gongxuanzhang commented on PR #15890:
URL: https://github.com/apache/kafka/pull/15890#issuecomment-2103687645

   @chia7712 
   I add rule into checkstyle.xml 
   you can  run  
   ```
   ./gradlew checkstyleMain checkstyleTest --continue
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-09 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1596055964


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,10 @@ class DelayedFetch(
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
 if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
+// This case is to handle the stale high-watermark on the 
leader until it gets updated with the correct value

Review Comment:
   Perhaps change to sth like the following.
   
   "If we don't know the position of the offset on log segments, just 
pessimistically assume that we only gained 1 byte. 
This can happen when the high watermark is stale, but should be rare."  



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   `LocalLog.read()` also calls `convertToOffsetMetadataOrThrow`.
   
   ```
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   It seems this could lead to infinite recursion. To avoid that, we could 
change the above code to avoid calling `convertToOffsetMetadataOrThrow` and 
return a message-only `LogOffsetMetadata` instead to `emptyFetchDataInfo`.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Good point. Since we change the logic such that it's ok not to have the 
metadata for an offset, we could just skip the call to `checkLogStartOffset`.



##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,10 @@ class DelayedFetch(
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
 if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {

Review Comment:
   `fetchOffset` typically shouldn't be message only. But it doesn't hurt to 
have the check.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata

Review Comment:
   lesser => less



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2024-05-09 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845142#comment-17845142
 ] 

Ayoub Omari commented on KAFKA-4212:


My company was looking for such feature earlier this year. For work-around we 
use a processor+punctuator similar to what [~savulchik] suggests. The problem 
is that we end up doing this for every KS application in our system.

Potential suggestion: In _Topology_ class we can both _addStateStore_ and 
{_}addProcessor{_}, why not overloading _addStateStore_ with additional TTL 
parameter and defining the processor+punctuator within it (hence the user 
wouldn't have to do it himself). If we are unsure how frequently we should 
punctuate, we can give control to the user (TTL would be a class with 
additional period field)
{code:java}
TTL(value: Duration, punctuateInterval: Duration){code}

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


junrao commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595995650


##
raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java:
##
@@ -79,20 +81,79 @@ void testRemoveVoter() {
 );
 }
 
+@Test
+void testIsVoterWithDirectoryId() {
+Map aVoterMap = voterMap(Arrays.asList(1, 
2, 3), true);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isVoter(aVoterMap.get(1).voterKey()));
+assertFalse(voterSet.isVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(;
+assertFalse(
+voterSet.isVoter(ReplicaKey.of(2, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(
+voterSet.isVoter(ReplicaKey.of(4, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testIsVoterWithoutDirectoryId() {
+Map aVoterMap = voterMap(Arrays.asList(1, 
2, 3), false);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(;
+assertTrue(voterSet.isVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testStandaloneAndIsOnlyVoter() {

Review Comment:
   testIsOnlyVoterInStandalone?



##
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java:
##
@@ -73,23 +88,35 @@ private QuorumState buildQuorumState(Set voters) {
 );
 }
 
-@Test
-public void shouldRecordVoterQuorumState() {
-QuorumState state = buildQuorumState(Utils.mkSet(localId, 1, 2));
+@ParameterizedTest
+@ValueSource(shorts = {0, 1})
+public void shouldRecordVoterQuorumState(short kraftVersion) {
+boolean withDirectoryId = kraftVersion > 0;
+Map voterMap = 
VoterSetTest.voterMap(Utils.mkSet(1, 2), withDirectoryId);
+voterMap.put(localId, VoterSetTest.voterNode(ReplicaKey.of(localId, 
Optional.of(localDirectoryId;

Review Comment:
   Should we use an empty directory id if kraftVersion is 0?



##
raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java:
##
@@ -79,20 +81,79 @@ void testRemoveVoter() {
 );
 }
 
+@Test
+void testIsVoterWithDirectoryId() {
+Map aVoterMap = voterMap(Arrays.asList(1, 
2, 3), true);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isVoter(aVoterMap.get(1).voterKey()));
+assertFalse(voterSet.isVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(;
+assertFalse(
+voterSet.isVoter(ReplicaKey.of(2, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(
+voterSet.isVoter(ReplicaKey.of(4, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testIsVoterWithoutDirectoryId() {
+Map aVoterMap = voterMap(Arrays.asList(1, 
2, 3), false);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(;
+assertTrue(voterSet.isVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testStandaloneAndIsOnlyVoter() {
+Map aVoterMap = 
voterMap(Arrays.asList(1), true);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isOnlyVoter(aVoterMap.get(1).voterKey()));
+assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Optional.empty(;
+assertFalse(
+voterSet.isOnlyVoter(ReplicaKey.of(4, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testNotStandaloneAndIsOnlyVoter() {

Review Comment:
   testIsOnlyVoterInNonStandalone?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-05-09 Thread via GitHub


soarez commented on code in PR #15697:
URL: https://github.com/apache/kafka/pull/15697#discussion_r1593094798


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2466,7 +2467,6 @@ class ReplicaManager(val config: KafkaConfig,
s"for partitions 
${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the 
failed log directory $dir.")
 }
 // retrieve the UUID here because logManager.handleLogDirFailure handler 
removes it

Review Comment:
   We should move or remove this comment now that the `uuid` declaration has 
been moved up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924) New interfaces and stubbed utility classes for pluggable TaskAssignors. [kafka]

2024-05-09 Thread via GitHub


ableegoldman commented on code in PR #15887:
URL: https://github.com/apache/kafka/pull/15887#discussion_r1595980303


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.streams.StreamsConfig;
+
+/**
+ * Assignment related configs for the Kafka Streams {@link TaskAssignor}.
+ */
+public class AssignmentConfigs {
+private final long acceptableRecoveryLag;
+private final int maxWarmupReplicas;
+private final int nonOverlapCost;
+private final int numStandbyReplicas;
+private final long probingRebalanceIntervalMs;
+private final List rackAwareAssignmentTags;
+private final int trafficCost;
+private final String assignmentStrategy;
+
+public AssignmentConfigs(final StreamsConfig configs) {
+acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+trafficCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
+nonOverlapCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
+assignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+}
+
+public AssignmentConfigs(final Long acceptableRecoveryLag,
+  final Integer maxWarmupReplicas,

Review Comment:
   nit: fix alignment for this and below params



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.streams.StreamsConfig;
+
+/**
+ * Assignment related configs for the Kafka Streams {@link TaskAssignor}.
+ */
+public class AssignmentConfigs {
+private final long acceptableRecoveryLag;
+private final int maxWarmupReplicas;
+private final int nonOverlapCost;
+private final int numStandbyReplicas;
+private final long probingRebalanceIntervalMs;
+private final List rackAwareAssignmentTags;
+private final int trafficCost;
+private final String assignmentStrategy;
+
+public AssignmentConfigs(final StreamsConfig configs) {
+acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);

Review Comment:
   nit: just call the below constructor 



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the 

Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595963491


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+private final Map voters;

Review Comment:
   I think I already updated the KIP to match this comment but I'll take a look 
again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-09 Thread via GitHub


ableegoldman commented on code in PR #15909:
URL: https://github.com/apache/kafka/pull/15909#discussion_r1595957783


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -193,11 +193,12 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 }
 pollTimer.update(currentTimeMs);
 if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
-logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+logger.warn("Consumer poll timeout has expired, exceeded by {} ms. 
This means the time between " +

Review Comment:
   IIUC this is what gets logged when the _heartbeat thread_ notices the 
consumer has failed to poll in time and dropped out of the group -- so the 
"time exceeded" is just going to be roughly the max poll interval + the 
heartbeat interval, no?
   
   I do think it's a great idea to log the amount of time by which the max poll 
interval was exceeded, but imo the more useful information is how long after 
the max poll interval the consumer took to actually hit poll again, not how 
long the heartbeat thread took to notice it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Validate at least one control record [kafka]

2024-05-09 Thread via GitHub


junrao commented on code in PR #15912:
URL: https://github.com/apache/kafka/pull/15912#discussion_r1595930091


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -161,8 +161,8 @@ public VotersRecord toVotersRecord(short version) {
  * An overlapping majority means that for all majorities in {@code this} 
set of voters and for
  * all majority in {@code that} set of voters, they have at least one 
voter in common.
  *
- * If this function returns true is means that one of the voter set 
commits an offset, it means
- * that the other voter set cannot commit a conflicting offset.
+ * If this function returns true, it means that if one of the set of 
voters commits an offset, the
+ * the other set of voters cannot commit a conflicting offset.

Review Comment:
   the the other => the other



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-09 Thread via GitHub


junrao commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595923193


##
raft/src/main/java/org/apache/kafka/raft/internals/TreeMapLogHistory.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.Map;
+
+/**
+ * A implementation for {@code LogHistory} which uses a red-black tree to 
store values sorted by offset.
+ */
+final public class TreeMapLogHistory implements LogHistory {
+private final NavigableMap history = new TreeMap<>();
+
+@Override
+public void addAt(long offset, T value) {
+if (offset < 0) {
+throw new IllegalArgumentException(
+String.format("Next offset %d must be greater than or equal to 
0", offset)
+);
+}
+
+Map.Entry lastEntry = history.lastEntry();
+if (lastEntry != null && offset <= lastEntry.getKey()) {
+throw new IllegalArgumentException(
+String.format("Next offset %d must be greater than the last 
offset %d", offset, lastEntry.getKey())
+);
+}
+
+history.put(offset, value);
+}
+
+@Override
+public Optional valueAtOrBefore(long offset) {
+return 
Optional.ofNullable(history.floorEntry(offset)).map(Map.Entry::getValue);
+}
+
+@Override
+public Optional> lastEntry() {
+return Optional.ofNullable(history.lastEntry()).map(entry -> new 
Entry<>(entry.getKey(), entry.getValue()));
+}
+
+@Override
+public void truncateNewEntries(long endOffset) {
+history.tailMap(endOffset, true).clear();
+}
+
+@Override
+public void truncateOldEntries(long startOffset) {
+NavigableMap lesserValues = history.headMap(startOffset, 
true);
+while (lesserValues.size() > 1) {

Review Comment:
   Thanks for the explanation. I can see that it works now since 
`lesserValues.size() > 1` is testing greater than, instead of greater than or 
equal to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-09 Thread via GitHub


junrao commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595922267


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+private final Map voters;

Review Comment:
   This approach is fine, but we need to update the KIP.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595916951


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java:
##
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+
+/**
+ * A type for storing the historical value of the set of voters.
+ *
+ * This type can be use to keep track in-memory the sets for voters stored in 
the latest snapshot
+ * and log. This is useful when generating a new snapshot at a given offset or 
when evaulating
+ * the latest set of voters.
+ */
+final public class VoterSetHistory {
+private final Optional staticVoterSet;
+private final LogHistory votersHistory = new 
TreeMapLogHistory<>();
+
+VoterSetHistory(Optional staticVoterSet) {
+this.staticVoterSet = staticVoterSet;
+}
+
+/**
+ * Add a new value at a given offset.
+ *
+ * The provided {@code offset} must be greater than or equal to 0 and must 
be greater than the
+ * offset of all previous calls to this method.
+ *
+ * @param offset the offset
+ * @param value the value to store

Review Comment:
   Fixed.



##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java:
##
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+
+/**
+ * A type for storing the historical value of the set of voters.
+ *
+ * This type can be use to keep track in-memory the sets for voters stored in 
the latest snapshot
+ * and log. This is useful when generating a new snapshot at a given offset or 
when evaulating
+ * the latest set of voters.
+ */
+final public class VoterSetHistory {
+private final Optional staticVoterSet;
+private final LogHistory votersHistory = new 
TreeMapLogHistory<>();
+
+VoterSetHistory(Optional staticVoterSet) {
+this.staticVoterSet = staticVoterSet;
+}
+
+/**
+ * Add a new value at a given offset.
+ *
+ * The provided {@code offset} must be greater than or equal to 0 and must 
be greater than the
+ * offset of all previous calls to this method.
+ *
+ * @param offset the offset
+ * @param value the value to store
+ * @throws IllegalArgumentException if the offset is not greater than all 
previous offsets
+ */
+public void addAt(long offset, VoterSet voters) {
+Optional> lastEntry = 
votersHistory.lastEntry();
+if (lastEntry.isPresent() && lastEntry.get().offset() >= 0) {
+// If the last voter set comes from the replicated log then the 
majorities must overlap.
+// This ignores the static voter set and the bootstrapped voter 
set since they come from
+// the configuration and the KRaft leader never guaranteed that 
they are the same across
+// all replicas.
+VoterSet lastVoterSet = lastEntry.get().value();
+if (!lastVoterSet.hasOverlappingMajority(voters)) {
+throw new IllegalArgumentException(
+String.format(
+"Last voter set %s doesn't have an overlapping 
majority with the new voter set %s",
+lastVoterSet,
+voters
+)
+);
+}
+}
+
+

Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595915596


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##
@@ -728,83 +730,114 @@ public static MemoryRecords withLeaderChangeMessage(
 ByteBuffer buffer,
 LeaderChangeMessage leaderChangeMessage
 ) {
-writeLeaderChangeMessage(buffer, initialOffset, timestamp, 
leaderEpoch, leaderChangeMessage);
-buffer.flip();
-return MemoryRecords.readableRecords(buffer);
+try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+initialOffset,
+timestamp,
+leaderEpoch,
+buffer
+)
+) {
+builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+return builder.build();
+}
 }
 
-private static void writeLeaderChangeMessage(
-ByteBuffer buffer,
+public static MemoryRecords withSnapshotHeaderRecord(
 long initialOffset,
 long timestamp,
 int leaderEpoch,
-LeaderChangeMessage leaderChangeMessage
+ByteBuffer buffer,
+SnapshotHeaderRecord snapshotHeaderRecord
 ) {
-try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
-buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
-TimestampType.CREATE_TIME, initialOffset, timestamp,
-RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
-false, true, leaderEpoch, buffer.capacity())
+try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+initialOffset,
+timestamp,
+leaderEpoch,
+buffer
+)
 ) {
-builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+builder.appendSnapshotHeaderMessage(timestamp, 
snapshotHeaderRecord);
+return builder.build();
 }
 }
 
-public static MemoryRecords withSnapshotHeaderRecord(
+public static MemoryRecords withSnapshotFooterRecord(
 long initialOffset,
 long timestamp,
 int leaderEpoch,
 ByteBuffer buffer,
-SnapshotHeaderRecord snapshotHeaderRecord
+SnapshotFooterRecord snapshotFooterRecord
 ) {
-writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, 
leaderEpoch, snapshotHeaderRecord);
-buffer.flip();
-return MemoryRecords.readableRecords(buffer);
+try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+initialOffset,
+timestamp,
+leaderEpoch,
+buffer
+)
+) {
+builder.appendSnapshotFooterMessage(timestamp, 
snapshotFooterRecord);
+return builder.build();
+}
 }
 
-private static void writeSnapshotHeaderRecord(
-ByteBuffer buffer,
+public static MemoryRecords withKRaftVersionRecord(
 long initialOffset,
 long timestamp,
 int leaderEpoch,
-SnapshotHeaderRecord snapshotHeaderRecord
+ByteBuffer buffer,
+KRaftVersionRecord kraftVersionRecord
 ) {
-try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
-buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
-TimestampType.CREATE_TIME, initialOffset, timestamp,
-RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
-false, true, leaderEpoch, buffer.capacity())
+try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+initialOffset,
+timestamp,
+leaderEpoch,
+buffer
+)
 ) {
-builder.appendSnapshotHeaderMessage(timestamp, 
snapshotHeaderRecord);
+builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord);
+return builder.build();
 }
 }
 
-public static MemoryRecords withSnapshotFooterRecord(
+public static MemoryRecords withVotersRecord(
 long initialOffset,
 long timestamp,
 int leaderEpoch,
 ByteBuffer buffer,
-SnapshotFooterRecord snapshotFooterRecord
+VotersRecord votersRecord
 ) {
-writeSnapshotFooterRecord(buffer, initialOffset, timestamp, 
leaderEpoch, snapshotFooterRecord);
-buffer.flip();
-return MemoryRecords.readableRecords(buffer);
+try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+initialOffset,
+timestamp,
+leaderEpoch,
+buffer
+)
+) {
+builder.appendVotersMessage(timestamp, votersRecord);
+return builder.build();
+}
 }
 
-private static void 

Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595916619


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting

Review Comment:
   Fixed.



##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java:
##
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+
+/**
+ * A type for storing the historical value of the set of voters.
+ *
+ * This type can be use to keep track in-memory the sets for voters stored in 
the latest snapshot

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595916355


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##
@@ -245,6 +255,42 @@ private void appendControlMessage(Function valueCreat
 }
 }
 
+private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) 
{
+// Confirm that it is at most one batch and it is a control record

Review Comment:
   Yeah. I fixed the comment and the implementation.



##
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import org.apache.kafka.common.message.KRaftVersionRecord;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.ControlRecord;
+import org.apache.kafka.raft.Isolation;
+import org.apache.kafka.raft.LogFetchInfo;
+import org.apache.kafka.raft.ReplicatedLog;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.snapshot.RawSnapshotReader;
+import org.apache.kafka.snapshot.RecordsSnapshotReader;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+
+/**
+ * The KRaft state machine for tracking control records in the topic partition.
+ *
+ * This type keeps track of changes to the finalized kraft.version and the 
sets of voters between
+ * the latest snasphot and the log end offset.
+ *
+ * The are two actors/threads for this type. One is the KRaft driver which 
indirectly call a lot of
+ * the public methods. The other are the callers of {@code 
RaftClient::createSnapshot} which
+ * indirectly call {@code voterSetAtOffset} and {@code kraftVersionAtOffset} 
when freezing a snapshot.
+ */
+final public class KRaftControlRecordStateMachine {
+private final ReplicatedLog log;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final Logger logger;
+private final int maxBatchSizeBytes;
+
+// These objects are synchronized using their respective object monitor. 
The two actors
+// are the KRaft driver when calling updateState and the RaftClient 
callers when freezing
+// snapshots
+private final VoterSetHistory voterSetHistory;
+private final LogHistory kraftVersionHistory = new 
TreeMapLogHistory<>();
+
+// This synchronization is enough because
+// 1. The write operation updateState only sets the value without reading 
it and updates to
+// voterSetHistory or kraftVersionHistory are done before setting the 
nextOffset
+//
+// 2. The read operations lastVoterSet, voterSetAtOffset and 
kraftVersionAtOffset read
+// the nextOffset first before reading voterSetHistory or 
kraftVersionHistory
+private volatile long nextOffset = 0;
+
+/**
+ * Constructs an internal log listener
+ *
+ * @param staticVoterSet the set of voter statically configured
+ * @param log the on disk topic partition
+ * @param serde the record decoder for data records
+ * @param bufferSupplier the supplier of byte buffers
+ * @param maxBatchSizeBytes the maximum size of record batch
+ * @param logContext the log context
+ */
+public KRaftControlRecordStateMachine(
+Optional staticVoterSet,
+ReplicatedLog log,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSizeBytes,
+LogContext logContext
+) {
+this.log = log;
+this.voterSetHistory = new VoterSetHistory(staticVoterSet);
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSizeBytes = maxBatchSizeBytes;
+this.logger = logContext.logger(this.getClass());
+}
+
+/**
+ * Must be called whenever the {@code log} has changed.
+ */
+public void updateState() {
+maybeLoadSnapshot();
+maybeLoadLog();
+

Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595916070


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -370,8 +368,52 @@ private void maybeFireLeaderChange() {
 }
 }
 
-@Override
-public void initialize() {
+public void initialize(
+Map voterAddresses,
+String listenerName,
+QuorumStateStore quorumStateStore,
+Metrics metrics
+) {
+partitionState = new KRaftControlRecordStateMachine(
+Optional.of(VoterSet.fromInetSocketAddresses(listenerName, 
voterAddresses)),
+log,
+serde,
+BufferSupplier.create(),
+MAX_BATCH_SIZE_BYTES,
+logContext
+);
+// Read the entire log
+logger.info("Reading KRaft snapshot and log as part of the 
initialization");
+partitionState.updateState();
+
+requestManager = new RequestManager(
+partitionState.lastVoterSet().voterIds(),
+quorumConfig.retryBackoffMs(),
+quorumConfig.requestTimeoutMs(),
+random
+);
+
+quorum = new QuorumState(
+nodeId,
+partitionState.lastVoterSet().voterIds(),
+quorumConfig.electionTimeoutMs(),
+quorumConfig.fetchTimeoutMs(),
+quorumStateStore,
+time,
+logContext,
+random
+);
+
+kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
+// All Raft voters are statically configured and known at startup
+// so there are no unknown voter connections. Report this metric as 0.
+kafkaRaftMetrics.updateNumUnknownVoterConnections(0);
+
+VoterSet lastVoterSet = partitionState.lastVoterSet();

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595915829


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1470,6 +1524,10 @@ private boolean handleFetchSnapshotResponse(
 quorum.leaderIdOrSentinel()
 );
 
+// This will aways reload the snapshot because the internal 
next offset

Review Comment:
   Fixed.



##
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import org.apache.kafka.common.message.KRaftVersionRecord;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.ControlRecord;
+import org.apache.kafka.raft.Isolation;
+import org.apache.kafka.raft.LogFetchInfo;
+import org.apache.kafka.raft.ReplicatedLog;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.snapshot.RawSnapshotReader;
+import org.apache.kafka.snapshot.RecordsSnapshotReader;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+
+/**
+ * The KRaft state machine for tracking control records in the topic partition.
+ *
+ * This type keeps track of changes to the finalized kraft.version and the 
sets of voters between
+ * the latest snasphot and the log end offset.
+ *
+ * The are two actors/threads for this type. One is the KRaft driver which 
indirectly call a lot of

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Add demo template for transactional client [kafka]

2024-05-09 Thread via GitHub


k-raina opened a new pull request, #15913:
URL: https://github.com/apache/kafka/pull/15913

   This is example code template for Transactional Client. This code assumes 
that new Exception types have already been implemented.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR; Validate at least one control record [kafka]

2024-05-09 Thread via GitHub


jsancio opened a new pull request, #15912:
URL: https://github.com/apache/kafka/pull/15912

   Validate that a control batch in the batch accumulator has at least one 
control record.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-09 Thread via GitHub


C0urante merged PR #6934:
URL: https://github.com/apache/kafka/pull/6934


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595910078


##
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java:
##
@@ -145,9 +147,7 @@ private Optional> nextBatch() {
 );
 }
 
-if (!batch.records().isEmpty()) {
-return Optional.of(batch);
-}
+return Optional.of(batch);

Review Comment:
   Yes. Good catch. We need an `if` statement. I updated the java doc too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug

2024-05-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845111#comment-17845111
 ] 

Matthias J. Sax commented on KAFKA-16584:
-

I am on the PMC and can help you. :) 

After you wiki account was created, please share you wiki id and we can give 
you write access to the Kafka wiki space, so you can prepare a KIP.

The goal of this ticket is, to add a new config for the logging interval, so it 
should not be controversial. An example of another already approved KIP that 
also added a now config is 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+repartition.purge.interval.ms+to+Kafka+Streams]
 – This should help you to write your KIP for this ticket.

> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Assignee: dujian
>Priority: Major
>  Labels: needs-kip, newbie
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595898159


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+private final Map voters;

Review Comment:
   Yes. That is a sufficient condition but I decided to implement a stricter 
condition that the ids need be unique. This means that with this 
implementation, if they want to replace a directory id, they need to first 
remove the failed replica key (id, uuid) and then add the new replica key (id, 
uuid').
   
   There are two reasons why I decide to keep the ids unique:
   1. It makes it easier and safer to implement the feature for automatically 
having new controllers join the quorum. I was concerned that the set of voters 
would become unavailable if there was a race where new directory id kept 
joining the cluster. In this example the cluster would be come unavailable and 
the user would not be able to mitigate it: [(1, uuid1), (2, uuid2), (3, uuid3), 
(3, uuid3'), (3, uuid3''), (3, uuid3''')].
   2. Connection management is easier to implement. A lot of code in Kafka 
(e.g. `o.a.k.c.Node`) assumes that ids are unique and they can be used, along 
with the listener name, to identify an endpoint. I think it would be a big 
effort to extend this to identify endpoint by the replica key. Another example 
is `NodeEndpoints` in `FetchResponse`. That map is index by replica id.
   
   The main disadvantage of this implementation is that it would make it 
difficult to design and implement the ability for dynamically "altering the 
metadata/kraft log directory" like Kafka does for regular topic partitions. But 
I think we can discuss that if we ever want to implement that feature in the 
future.



##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import 

Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]

2024-05-09 Thread via GitHub


lianetm commented on PR #15897:
URL: https://github.com/apache/kafka/pull/15897#issuecomment-2103291571

   Thanks for the patch @FrankYang0529, nice twist. Left some comments. Also 
there are examples for the `ClusterTest` annotation in the [core README 
file](https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/junit/README.md)
 so it should be updated too to reflect the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]

2024-05-09 Thread via GitHub


lianetm commented on code in PR #15897:
URL: https://github.com/apache/kafka/pull/15897#discussion_r1595878217


##
core/src/test/java/kafka/test/annotation/ClusterTest.java:
##
@@ -33,7 +33,7 @@
 @Retention(RUNTIME)
 @TestTemplate
 public @interface ClusterTest {
-Type clusterType() default Type.DEFAULT;
+Type[] clusterTypes() default {};

Review Comment:
   this bit is the one I'm still going around. I totally like the direction of 
the PR, removing the type ALL and DEFAULT , but then it brings the need to 
specify the full list of cluster types on every test that needs them all, so 
wonder if having the full list as default here would be a good complementary 
change? 
   
   It would mean that we have well defined `ClusterTypes` as introduced by this 
PR, but every test running on `ClusterTest` would run for all types, unless 
specified. That removes lots of `clusterTypes = {Type.ZK, Type.KRAFT, 
Type.CO_KRAFT}`, it's probably the default behaviour we want for a test so that 
no ones accidentally forgets to run on any specific cluster type, and makes it 
easier to maintain (keeping the full list only in this single place, not on 
every test). Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]

2024-05-09 Thread via GitHub


lianetm commented on code in PR #15897:
URL: https://github.com/apache/kafka/pull/15897#discussion_r1595878217


##
core/src/test/java/kafka/test/annotation/ClusterTest.java:
##
@@ -33,7 +33,7 @@
 @Retention(RUNTIME)
 @TestTemplate
 public @interface ClusterTest {
-Type clusterType() default Type.DEFAULT;
+Type[] clusterTypes() default {};

Review Comment:
   this bit is the one I'm still going around. I totally like the direction of 
the PR, removing the type ALL and DEFAULT , but then it brings the need to 
specify the full list of cluster types on every test that needs them all, so 
wonder if having the full list as default here would be a good complementary 
change? It would mean that we have well defined `ClusterTypes` as introduced by 
this PR, but every test running on `ClusterTest` would run for all types, 
unless specified. That removes lots of `clusterTypes = {Type.ZK, Type.KRAFT, 
Type.CO_KRAFT}`, it's probably the default behaviour we want for a test so that 
no ones accidentally forgets to run on any specific cluster type, and makes it 
easier to maintain (keeping the full list only in this single place, not on 
every test). Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595871101


##
raft/src/main/java/org/apache/kafka/raft/internals/TreeMapLogHistory.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.Map;
+
+/**
+ * A implementation for {@code LogHistory} which uses a red-black tree to 
store values sorted by offset.
+ */
+final public class TreeMapLogHistory implements LogHistory {
+private final NavigableMap history = new TreeMap<>();
+
+@Override
+public void addAt(long offset, T value) {
+if (offset < 0) {
+throw new IllegalArgumentException(
+String.format("Next offset %d must be greater than or equal to 
0", offset)
+);
+}
+
+Map.Entry lastEntry = history.lastEntry();
+if (lastEntry != null && offset <= lastEntry.getKey()) {
+throw new IllegalArgumentException(
+String.format("Next offset %d must be greater than the last 
offset %d", offset, lastEntry.getKey())
+);
+}
+
+history.put(offset, value);
+}
+
+@Override
+public Optional valueAtOrBefore(long offset) {
+return 
Optional.ofNullable(history.floorEntry(offset)).map(Map.Entry::getValue);
+}
+
+@Override
+public Optional> lastEntry() {
+return Optional.ofNullable(history.lastEntry()).map(entry -> new 
Entry<>(entry.getKey(), entry.getValue()));
+}
+
+@Override
+public void truncateNewEntries(long endOffset) {
+history.tailMap(endOffset, true).clear();
+}
+
+@Override
+public void truncateOldEntries(long startOffset) {
+NavigableMap lesserValues = history.headMap(startOffset, 
true);
+while (lesserValues.size() > 1) {

Review Comment:
   It should be implemented. Take a look at this test:
   ```java
 @Test
 void testTrimPrefixTo() {
 TreeMapLogHistory history = new TreeMapLogHistory<>();
 history.addAt(100, "100");
 history.addAt(200, "200");
   
 ...
   
 history.truncateOldEntries(101);
 assertEquals(Optional.empty(), history.valueAtOrBefore(99));
 assertEquals(Optional.of("100"), history.valueAtOrBefore(100));
   
 ...
   
 history.truncateOldEntries(200);
 assertEquals(Optional.empty(), history.valueAtOrBefore(199));
 assertEquals(Optional.of("200"), history.valueAtOrBefore(200));
 }
   ```
   
   For `truncateOldEntries(101)` there is one entry between `(0, 101)` so the 
entry at 100 is not deleted. For `truncateOldEntries(200)` there are two 
entries between `(0, 200)` so the oldest one at 100 is delete but the newest 
one at 200 is kept.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]

2024-05-09 Thread via GitHub


lianetm commented on code in PR #15897:
URL: https://github.com/apache/kafka/pull/15897#discussion_r1595849504


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -219,8 +221,8 @@ public static class Builder {
 
 private Builder() {}
 
-public Builder setType(Type type) {
-this.type = type;
+public Builder setTypes(Set types) {
+this.types = Collections.unmodifiableSet(new HashSet<>(types));

Review Comment:
   `Collections.unmodifiableSet` will create a new object from the provided 
argument, so why creating a new `HashSet` and not simply 
`Collections.unmodifiableSet(types)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation

2024-05-09 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar reassigned KAFKA-15905:
-

Assignee: Edoardo Comar

> Restarts of MirrorCheckpointTask should not permanently interrupt offset 
> translation
> 
>
> Key: KAFKA-15905
> URL: https://issues.apache.org/jira/browse/KAFKA-15905
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Edoardo Comar
>Priority: Major
>
> Executive summary: When the MirrorCheckpointTask restarts, it loses the state 
> of checkpointsPerConsumerGroup, which limits offset translation to records 
> mirrored after the latest restart.
> For example, if 1000 records are mirrored and the OffsetSyncs are read by 
> MirrorCheckpointTask, the emitted checkpoints are cached, and translation can 
> happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more 
> records are mirrored, translation can happen at the ~1500th record, but no 
> longer at the ~500th record.
> Context:
> Before KAFKA-13659, MM2 made translation decisions based on the 
> incompletely-initialized OffsetSyncStore, and the checkpoint could appear to 
> go backwards temporarily during restarts. To fix this, we forced the 
> OffsetSyncStore to initialize completely before translation could take place, 
> ensuring that the latest OffsetSync had been read, and thus providing the 
> most accurate translation.
> Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. 
> Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to 
> allow for translation of earlier offsets. This came with the caveat that the 
> cache's sparseness allowed translations to go backwards permanently. To 
> prevent this behavior, a cache of the latest Checkpoints was kept in the 
> MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset 
> translation remained restricted to the fully-initialized OffsetSyncStore.
> Effectively, the MirrorCheckpointTask ensures that it translates based on an 
> OffsetSync emitted during it's lifetime, to ensure that no previous 
> MirrorCheckpointTask emitted a later sync. If we can read the checkpoints 
> emitted by previous generations of MirrorCheckpointTask, we can still ensure 
> that checkpoints are monotonic, while allowing translation further back in 
> history.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-09 Thread via GitHub


gharris1727 commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2103188026

   Hey @edoardocomar and @prestona thanks for the PR!
   
   One of the reasons I thought this might require a KIP is because it requires 
additional permissions that the current MM2 doesn't need: If an operator has 
already configured ACLs such that MM2 has write permissions for the checkpoints 
topic but no read permissions, it could be operating today and then failing 
after an upgrade with this change. I don't know if that is a common 
configuration or even a recommended one, but it does seem possible in the wild.
   
   Perhaps this can be configuration-less and backwards-compatible if we 
fallback to the old behavior if reading the checkpoints fails for any reason, 
including insufficient permissions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595796530


##
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##
@@ -850,14 +846,10 @@ public String toString() {
 }
 
 private static class InflightRequest {
-final int correlationId;
 final int sourceId;
-final int destinationId;
 
 private InflightRequest(int correlationId, int sourceId, int 
destinationId) {
-this.correlationId = correlationId;

Review Comment:
   I agree. Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595793667


##
raft/src/test/java/org/apache/kafka/raft/ElectionStateTest.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.generated.QuorumStateData;
+import org.apache.kafka.raft.internals.ReplicaKey;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final class ElectionStateTest {
+@Test
+void testVotedCandidateWithoutVotedId() {
+ElectionState electionState = ElectionState.withUnknownLeader(5, 
Collections.emptySet());
+assertFalse(electionState.isVotedCandidate(ReplicaKey.of(1, 
Optional.empty(;
+}
+
+@Test
+void testVotedCandidateWithoutVotedDirectoryId() {
+ElectionState electionState = ElectionState.withVotedCandidate(
+5,
+ReplicaKey.of(1, Optional.empty()),
+Collections.emptySet()
+);
+assertTrue(electionState.isVotedCandidate(ReplicaKey.of(1, 
Optional.empty(;
+assertTrue(
+electionState.isVotedCandidate(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid(
+);
+}
+
+@Test
+void testVotedCandidateWithVotedDirectoryId() {
+ReplicaKey votedKey = ReplicaKey.of(1, Optional.of(Uuid.randomUuid()));
+ElectionState electionState = ElectionState.withVotedCandidate(
+5,
+votedKey,
+Collections.emptySet()
+);
+assertFalse(electionState.isVotedCandidate(ReplicaKey.of(1, 
Optional.empty(;
+assertTrue(electionState.isVotedCandidate(votedKey));
+}
+
+@ParameterizedTest
+@ValueSource(shorts = {0, 1})
+void testQuorumStateDataRoundTrip(short version) {
+ReplicaKey votedKey = ReplicaKey.of(1, Optional.of(Uuid.randomUuid()));
+List electionStates = Arrays.asList(
+ElectionState.withUnknownLeader(5, Utils.mkSet(1, 2, 3)),
+ElectionState.withElectedLeader(5, 1, Utils.mkSet(1, 2, 3)),
+ElectionState.withVotedCandidate(5, votedKey, Utils.mkSet(1, 2, 3))
+);
+
+final List expected;
+if (version == 0) {
+expected = Arrays.asList(
+ElectionState.withUnknownLeader(5, Utils.mkSet(1, 2, 3)),
+ElectionState.withElectedLeader(5, 1, Utils.mkSet(1, 2, 3)),
+ElectionState.withVotedCandidate(
+5,
+ReplicaKey.of(1, Optional.empty()),
+Utils.mkSet(1, 2, 3)
+)
+);
+} else {
+expected = Arrays.asList(
+ElectionState.withUnknownLeader(5, Collections.emptySet()),
+ElectionState.withElectedLeader(5, 1, Collections.emptySet()),
+ElectionState.withVotedCandidate(5, votedKey, 
Collections.emptySet())

Review Comment:
   `ElectionState` is the in-memory representation of the `quorum-state` file 
or `QuorumStateData`. When version 1 is used, `votedDirectoryId` is persisted 
but `voters` is not. When version 0 is used, `voters` is persisted but 
`votedDirectoryId` is not.
   
   The reason why we need to keep persisting `voters` in version 0 is because I 
want to allow the user to upgrade to the 3.8 JAR without upgrading to 
kraft.version 1 and then downgrade to the 3.7 JAR. To allow this, Kafka 3.8 
needs to keep persisting the `voters` even though that version of Kafka doesn't 
use the `voters` field. 
   
   If you notice, there is not method `ElectionState::voters()`. So there is no 
way for the latest version of Kafka to use those values. They are just there 
for backwards compatibility if they don't 

Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595784042


##
raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java:
##
@@ -79,20 +81,79 @@ void testRemoveVoter() {
 );
 }
 
+@Test
+void testIsVoterWithDirectoryId() {
+Map aVoterMap = voterMap(Arrays.asList(1, 
2, 3), true);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isVoter(aVoterMap.get(1).voterKey()));
+assertFalse(voterSet.isVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(;
+assertFalse(
+voterSet.isVoter(ReplicaKey.of(2, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(
+voterSet.isVoter(ReplicaKey.of(4, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testIsVoterWithoutDirectoryId() {
+Map aVoterMap = voterMap(Arrays.asList(1, 
2, 3), false);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(;
+assertTrue(voterSet.isVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testStandaloneAndOnlyVoter() {
+Map aVoterMap = 
voterMap(Arrays.asList(1), true);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isOnlyVoter(aVoterMap.get(1).voterKey()));
+assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Optional.empty(;
+assertFalse(
+voterSet.isOnlyVoter(ReplicaKey.of(4, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testOnlyVoter() {

Review Comment:
   Fair. I renamed it to `testNotStandaloneAndIsOnlyVoter`. I mean to the 
behavior of `isOnlyVoter` when there are more than one voter in the set of 
voters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595780757


##
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java:
##
@@ -63,7 +66,9 @@ public void tearDown() {
 private QuorumState buildQuorumState(Set voters) {
 return new QuorumState(
 OptionalInt.of(localId),
-voters,
+localDirectoryId,
+() -> VoterSetTest.voterSet(VoterSetTest.voterMap(voters, false)),
+() -> (short) 0,

Review Comment:
   I made the parameterized and configured them to run with `kraft.version` 0 
and 1.
   
   In general we didn't really need to test both cases because `kraft.version` 
affects the persisted state not the in-memory state. Since those tests are not 
recreating a `QuorumState` object they don't affect the directory id used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]

2024-05-09 Thread via GitHub


kamalcph opened a new pull request, #15911:
URL: https://github.com/apache/kafka/pull/15911

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16696) Remove the in-memory implementation of RSM and RLMM

2024-05-09 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16696:
-
Description: 
The in-memory implementation of RSM and RLMM were written to write the 
unit/integration tests: [https://github.com/apache/kafka/pull/10218]

This is not used by any of the tests and superseded by the LocalTieredStorage 
framework which uses local-disk as secondary storage and topic as RLMM. Using 
the LocalTieredStorage framework is the preferred way to write the integration 
tests to capture any regression as it uses the internal topic as storage for 
RLMM which is the default implementation. 

  was:
The in-memory implementation of RSM and RLMM were written to write the 
unit/integration tests: [https://github.com/apache/kafka/pull/10218]

This is not used by any of the tests and superseded by the LocalTieredStorage 
framework which uses local-disk as secondary storage and topic as RLMM. Using 
the LocalTieredStorage framework is the preferred way to write the integration 
tests to capture any regression as it uses the official topic as storage for 
RLMM.


> Remove the in-memory implementation of RSM and RLMM
> ---
>
> Key: KAFKA-16696
> URL: https://issues.apache.org/jira/browse/KAFKA-16696
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Minor
>
> The in-memory implementation of RSM and RLMM were written to write the 
> unit/integration tests: [https://github.com/apache/kafka/pull/10218]
> This is not used by any of the tests and superseded by the LocalTieredStorage 
> framework which uses local-disk as secondary storage and topic as RLMM. Using 
> the LocalTieredStorage framework is the preferred way to write the 
> integration tests to capture any regression as it uses the internal topic as 
> storage for RLMM which is the default implementation. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16696) Remove the in-memory implementation of RSM and RLMM

2024-05-09 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16696:
-
Description: 
The in-memory implementation of RSM and RLMM were written to write the 
unit/integration tests: [https://github.com/apache/kafka/pull/10218]

This is not used by any of the tests and superseded by the LocalTieredStorage 
framework which uses local-disk as secondary storage and topic as RLMM. Using 
the LocalTieredStorage framework is the preferred way to write the integration 
tests to capture any regression as it uses the official topic as storage for 
RLMM.

  was:
The in-memory implementation of RSM and RLMM were written to write the 
unit/integration tests: [https://github.com/apache/kafka/pull/10218]

This is not used by any of the tests and superseded by the LocalTieredStorage 
framework which uses local-disk as secondary storage and topic as RLMM.


> Remove the in-memory implementation of RSM and RLMM
> ---
>
> Key: KAFKA-16696
> URL: https://issues.apache.org/jira/browse/KAFKA-16696
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Minor
>
> The in-memory implementation of RSM and RLMM were written to write the 
> unit/integration tests: [https://github.com/apache/kafka/pull/10218]
> This is not used by any of the tests and superseded by the LocalTieredStorage 
> framework which uses local-disk as secondary storage and topic as RLMM. Using 
> the LocalTieredStorage framework is the preferred way to write the 
> integration tests to capture any regression as it uses the official topic as 
> storage for RLMM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16696) Remove the in-memory implementation of RSM and RLMM

2024-05-09 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16696:


 Summary: Remove the in-memory implementation of RSM and RLMM
 Key: KAFKA-16696
 URL: https://issues.apache.org/jira/browse/KAFKA-16696
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


The in-memory implementation of RSM and RLMM were written to write the 
unit/integration tests: [https://github.com/apache/kafka/pull/10218]

This is not used by any of the tests and superseded by the LocalTieredStorage 
framework which uses local-disk as secondary storage and topic as RLMM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16693) Kafka Users are created with ACL entries and during performing operations allowed by ACL we see Denied Operation

2024-05-09 Thread Janardhana Gopalachar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Janardhana Gopalachar updated KAFKA-16693:
--
Description: 
Hi

We have created 2 KafkaUsers from Strimzi operator for 2 different cluster with 
ACL entries. we have observed that the ACL entries in teh Kafka Cluster will 
not be present for approximately 4 minutes  and the ACL entries will be 
available after that. The clients which are using KafkaUser to perform 
operations not be able to perform operations till the ACL is available in 
KafkaCLuster.

We see in the Kafka Logs below are the mesages the user and cluster details in 
messages are not added since it is proprietery

 

Processing notification(s) to /config/change

Processing override for entityPath

Removing PRODUCE quota for user

Removing FETCH quota for user

Removing REQUEST quota for user

Removing CONTROLLER_MUTATION quota for user

Processing notification(s) to /kafka-acl-changes

Processing Acl change notification for

Processing notification(s) to /config/changes

Processing notification(s) to /kafka-acl-changes

Processing Acl change notification for ResourcePattern(resourceType=GROU

 

The same behavior is observed even if we create a single KafkaUser , same 
behavior is observed

 

  was:
Hi

We have created 2 KafkaUsers from Strimzi operator for 2 different cluster with 
ACL entries. we have observed that the ACL entries in teh Kafka Cluster will 
not be present for approximately 4 minutes  and the ACL entries will be 
available after that. The clients which are using KafkaUser to perform 
operations not be able to perform operations till the ACL is available in 
KafkaCLuster.

We see in the Kafka Logs below are the mesages the user and cluster details in 
messages are not added since it is proprietery

 

Processing notification(s) to /config/change

Processing override for entityPath

Removing PRODUCE quota for user

Removing FETCH quota for user

Removing REQUEST quota for user

Removing CONTROLLER_MUTATION quota for user

Processing notification(s) to /kafka-acl-changes

Processing Acl change notification for

Processing notification(s) to /config/changes

Processing notification(s) to /kafka-acl-changes

Processing Acl change notification for ResourcePattern(resourceType=GROU

 

 


> Kafka Users are created with ACL entries and during performing operations 
> allowed by ACL we see Denied Operation
> 
>
> Key: KAFKA-16693
> URL: https://issues.apache.org/jira/browse/KAFKA-16693
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.1
>Reporter: Janardhana Gopalachar
>Priority: Blocker
>
> Hi
> We have created 2 KafkaUsers from Strimzi operator for 2 different cluster 
> with ACL entries. we have observed that the ACL entries in teh Kafka Cluster 
> will not be present for approximately 4 minutes  and the ACL entries will be 
> available after that. The clients which are using KafkaUser to perform 
> operations not be able to perform operations till the ACL is available in 
> KafkaCLuster.
> We see in the Kafka Logs below are the mesages the user and cluster details 
> in messages are not added since it is proprietery
>  
> Processing notification(s) to /config/change
> Processing override for entityPath
> Removing PRODUCE quota for user
> Removing FETCH quota for user
> Removing REQUEST quota for user
> Removing CONTROLLER_MUTATION quota for user
> Processing notification(s) to /kafka-acl-changes
> Processing Acl change notification for
> Processing notification(s) to /config/changes
> Processing notification(s) to /kafka-acl-changes
> Processing Acl change notification for ResourcePattern(resourceType=GROU
>  
> The same behavior is observed even if we create a single KafkaUser , same 
> behavior is observed
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-7300) Add KafkaConsumer fetch-error-rate and fetch-error-total metrics

2024-05-09 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reassigned KAFKA-7300:
-

Assignee: Philip Nee  (was: Kevin Lu)

> Add KafkaConsumer fetch-error-rate and fetch-error-total metrics 
> -
>
> Key: KAFKA-7300
> URL: https://issues.apache.org/jira/browse/KAFKA-7300
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer, metrics
>Reporter: Kevin Lu
>Assignee: Philip Nee
>Priority: Minor
>
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-356%3A+Add+KafkaConsumer+fetch-error-rate+and+fetch-error-total+metrics]
>  
> The KafkaConsumer is a complex client that requires many different components 
> to function properly. When a consumer is not operating properly, it can be 
> difficult to identify the root cause and which component is causing issues 
> (ConsumerCoordinator, Fetcher, ConsumerNetworkClient, etc).
>  
> This aims to improve the monitoring and detection of KafkaConsumer’s Fetcher 
> component.
>  
> Fetcher will send a fetch request for each node that the consumer has 
> assigned partitions for.
>  
> This fetch request may fail under the following cases:
>  * Intermittent network issues (goes to onFailure)
>  * Node sent an invalid full/incremental fetch response 
> (FetchSessionHandler’s handleResponse returns false)
>  * FetchSessionIdNotFound
>  * InvalidFetchSessionEpochException
>  
> These cases are logged, but it would be valuable to provide a corresponding 
> metric that allows for monitoring and alerting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-09 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2103078761

   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16686: Wait for given offset in TopicBasedRemoteLogMetadataManagerTest [kafka]

2024-05-09 Thread via GitHub


satishd commented on code in PR #15885:
URL: https://github.com/apache/kafka/pull/15885#discussion_r1595738359


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##
@@ -152,15 +152,17 @@ public void testNewPartitionUpdates() throws Exception {
 
 // RemoteLogSegmentMetadata events are already published, and 
topicBasedRlmm's consumer manager will start
 // fetching those events and build the cache.
-waitUntilConsumerCatchesUp(newLeaderTopicIdPartition, 
newFollowerTopicIdPartition, 30_000L);
-
+waitUntilConsumerCatchesUp(newLeaderTopicIdPartition, 
newFollowerTopicIdPartition, 30_000L, 0, 0);
 
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
 
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
 }
 
 private void waitUntilConsumerCatchesUp(TopicIdPartition 
newLeaderTopicIdPartition,
 TopicIdPartition 
newFollowerTopicIdPartition,
-long timeoutMs) throws 
TimeoutException {
+long timeoutMs,
+long 
targetLeaderMetadataPartitionOffset,

Review Comment:
   These parameters will not help much here as this method was written for 
`testNewPartitionUpdates` but other tests in this class used the functionality 
with the gaps. It is better to relook at those usecases and refactor this 
method respectively. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924) New interfaces and stubbed utility classes for pluggable TaskAssignors. [kafka]

2024-05-09 Thread via GitHub


apourchet commented on code in PR #15887:
URL: https://github.com/apache/kafka/pull/15887#discussion_r1595700958


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedSet;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.HostInfo;
+
+/**
+ * A read-only metadata class representing the current state of each 
KafkaStreams client with at least one StreamThread participating in this 
rebalance
+ */
+public interface KafkaStreamsState {
+/**
+ * @return the processId of the application instance running on this 
KafkaStreams client
+ */
+ProcessID processId();
+
+/**
+ * Returns the number of processing threads available to work on tasks for 
this KafkaStreams client,
+ * which represents its overall capacity for work relative to other 
KafkaStreams clients.
+ *
+ * @return the number of processing threads on this KafkaStreams client
+ */
+int numProcessingThreads();
+
+/**
+ * @return the set of consumer client ids for this KafkaStreams client
+ */
+SortedSet consumerClientIds();
+
+/**
+ * @return the set of all active tasks owned by consumers on this 
KafkaStreams client since the previous rebalance
+ */
+SortedSet previousActiveTasks();
+
+/**
+ * @return the set of all standby tasks owned by consumers on this 
KafkaStreams client since the previous rebalance
+ */
+SortedSet previousStandbyTasks();
+
+/**
+ * Returns the total lag across all logged stores in the task. Equal to 
the end offset sum if this client
+ * did not have any state for this task on disk.
+ *
+ * @return end offset sum - offset sum
+ *Task.LATEST_OFFSET if this was previously an active 
running task on this client

Review Comment:
   IllegalArgumentException usually refers to an argument that was passed in, 
as opposed to some global misconfiguration, so I'll go for 
UnsupportedOperationException unless someone else feels strongly about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix LogValidatorTest#checkNonCompressed [kafka]

2024-05-09 Thread via GitHub


junrao commented on code in PR #15904:
URL: https://github.com/apache/kafka/pull/15904#discussion_r1595670550


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -414,7 +414,15 @@ class LogValidatorTest {
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
 
-assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)
+// V2: Only one batch is in the records, so the shallow 
OffsetOfMaxTimestamp is the last offset of the single batch
+// V1: 3 batches are in the records, so the shallow OffsetOfMaxTimestamp 
is the timestamp of branch-1

Review Comment:
   Hmm, what is branch?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1177,6 +1177,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   validBytesCount += batchSize
 
   val batchCompression = CompressionType.forId(batch.compressionType.id)
+  // V2: only one batch regardless of compression

Review Comment:
   This is not very accurate. `analyzeAndValidateRecords()` is called in the 
follower append too, which could include more than one batch. It's just that 
sourceCompression is only used on the leader path, which only contains one 
batch currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-09 Thread via GitHub


edoardocomar opened a new pull request, #15910:
URL: https://github.com/apache/kafka/pull/15910

   KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently 
interrupt offset translation
   
   MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore 
stores OffsetSyncs before reading till end.
   
   Add test case simulating restarted task where the store is reinitialized 
with later OffsetSyncs and check that emitted Checkpoint do not rewind.
   
   Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until 
consumer group fully catches up once because the OffsetSyncStore store is 
populated before reading to log end.
   
   Co-Authored-By: Adrian Preston 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-09 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845045#comment-17845045
 ] 

Philip Nee commented on KAFKA-16687:


Since you mentioned `disabled JMX reporter` would resolve the issue.  I was 
wondering if this is caused by leaking network connections.  I have been 
wondering if it was caused by the sticky node used by the telemetry sender: 
[https://github.com/apache/kafka/commit/2b99d0e45027c88ae2347fa8f7d1ff4b2b919089]

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-09 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845042#comment-17845042
 ] 

Philip Nee commented on KAFKA-16687:


Thank you. 

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time

2024-05-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16695:
---
Fix Version/s: 3.8.0

> Improve expired poll interval logging by showing exceeded time
> --
>
> Key: KAFKA-16695
> URL: https://issues.apache.org/jira/browse/KAFKA-16695
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When a consumer poll iteration takes longer than the max.poll.interval, the 
> consumer logs a warn suggesting that the max.poll.interval config was 
> exceeded, and pro-actively leaves the group. The log suggests to consider 
> adjusting the max.poll.interval.config which should help in the cases of long 
> processing times. We should consider adding the info of how much time the 
> interval was exceeded, since it could be helpful in guiding the user to 
> effectively adjust the config. This is done in other clients, that log this 
> kind of messages in this situation:
> {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust 
> max.poll.interval.ms for long-running message processing): leaving 
> group{quote}
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time

2024-05-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16695:
---
Labels: kip-848-client-support  (was: )

> Improve expired poll interval logging by showing exceeded time
> --
>
> Key: KAFKA-16695
> URL: https://issues.apache.org/jira/browse/KAFKA-16695
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> When a consumer poll iteration takes longer than the max.poll.interval, the 
> consumer logs a warn suggesting that the max.poll.interval config was 
> exceeded, and pro-actively leaves the group. The log suggests to consider 
> adjusting the max.poll.interval.config which should help in the cases of long 
> processing times. We should consider adding the info of how much time the 
> interval was exceeded, since it could be helpful in guiding the user to 
> effectively adjust the config. This is done in other clients, that log this 
> kind of messages in this situation:
> {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust 
> max.poll.interval.ms for long-running message processing): leaving 
> group{quote}
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-09 Thread via GitHub


lianetm commented on PR #15909:
URL: https://github.com/apache/kafka/pull/15909#issuecomment-2102939146

   Hey @mjsax , here is the improved logging following your suggestion, helpful 
indeed I expect. Would you have a chance to take a look? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]

2024-05-09 Thread via GitHub


C0urante commented on PR #15893:
URL: https://github.com/apache/kafka/pull/15893#issuecomment-2102926693

   I think this leads to a change in behavior. Right now this test 
(surprisingly!) passes on trunk:
   
   ```java
   public class TimestampConverterTest {
   // ...
   
   @Test
   public void testWithSchemaFieldWithDefaultValue() {
   Map config = new HashMap<>();
   config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
   config.put(TimestampConverter.FIELD_CONFIG, "timestamp_field");
   xformValue.configure(config);
   
   java.util.Date defaultFieldValue = new java.util.Date();
   Schema schema = SchemaBuilder.struct()
   .field(
   "timestamp_field",
   Timestamp.builder()
   .defaultValue(defaultFieldValue)
   .build()
   );
   Struct value = new Struct(schema)
   .put("timestamp_field", DATE_PLUS_TIME.getTime());
   
   SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(schema, value));
   
   assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
   Struct transformedValue = (Struct) transformed.value();
   assertEquals(DATE_PLUS_TIME.getTime(), 
transformedValue.get("timestamp_field"));
   
assertNull(transformedValue.schema().field("timestamp_field").schema().defaultValue());
   }
   }
   ```
   
   We don't propagate default values for the fields we convert. Instead, we 
automatically substitute in the default value if none is found. This is 
surprising behavior, and has led to things like 
[KIP-581](https://cwiki.apache.org/confluence/display/KAFKA/KIP-581%3A+Value+of+optional+null+field+which+has+default+value)
 and 
[KIP-1040](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=303794677),
 but I don't think we should change it in this KIP because it's out of scope 
and, if necessary, can be touched on in KIP-1040 (which is in discussion at the 
moment).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595620870


##
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##
@@ -481,7 +517,7 @@ private void durableTransitionTo(EpochState state) {
 }
 }
 
-this.store.writeElectionState(state.election());
+this.store.writeElectionState(state.election(), 
latestKraftVersion.get());

Review Comment:
   Yeah. I am also not a fan of this inconsistency. I try to change them as I 
modifying related lines. I'll go through this file and fix all of the ones I 
can find.
   
   I think the convention is to not use `this` unless it is needed to 
disambiguate from a locally scoped variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595616944


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1700,16 +1710,16 @@ private void handleResponse(RaftResponse.Inbound 
response, long currentTimeMs) {
 }
 
 /**
- * Validate a request which is only valid between voters. If an error is
- * present in the returned value, it should be returned in the response.
+ * Validate common state for requests to establish leadership.
+ *
+ * These include the Vote, BeginQuorumEpoch rnd EndQuorumEpoch RPCs. If an 
error is present in

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595615609


##
raft/src/main/java/org/apache/kafka/raft/FileQuorumStateStore.java:
##
@@ -59,17 +54,18 @@
  *   "data_version":0}
  * 
  * */
-public class FileBasedStateStore implements QuorumStateStore {
-private static final Logger log = 
LoggerFactory.getLogger(FileBasedStateStore.class);
+public class FileQuorumStateStore implements QuorumStateStore {

Review Comment:
   Fixed. It looks like we are not building the Java Doc HTML pages any more so 
I can't test the HTML generated by this change.
   
   I need to investigate this after this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]

2024-05-09 Thread via GitHub


dajac commented on PR #15835:
URL: https://github.com/apache/kafka/pull/15835#issuecomment-2102880603

   @jeffkbkim The unit test seems to be flaky. Could you please check? 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15785/19/tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: migrate DescribeConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-09 Thread via GitHub


FrankYang0529 opened a new pull request, #15908:
URL: https://github.com/apache/kafka/pull/15908

   By using ClusterTestExtensions, DescribeConsumerGroupTest get get away from 
KafkaServerTestHarness dependency.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-09 Thread via GitHub


FrankYang0529 commented on PR #15821:
URL: https://github.com/apache/kafka/pull/15821#issuecomment-2102879454

   > > This is used by ListConsumerGroupTest and DescribeConsumerGroupTest. I 
think we can create a new class SimpleConsumerGroupExecutorTestUtils for it. 
WDYT? Thank you.
   > 
   > That is addressed already. see 
https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L92
   
   Thanks. The `SimpleConsumerGroupExecutor ` subscribe topic partitions. I 
have updated `ConsumerGroupCommandTestUtils` to support it.
   
   
https://github.com/apache/kafka/blob/f4fdaa702a2e718bdb44b9c5fec254f32a33f0d8/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java#L274-L287


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16660: reduce the check interval to speedup DelegationTokenRequestsTest [kafka]

2024-05-09 Thread via GitHub


brandboat commented on PR #15907:
URL: https://github.com/apache/kafka/pull/15907#issuecomment-2102870594

   Loop 131 times and all passed `I=0; while ./gradlew core:test --tests 
DelegationTokenRequestsTest --rerun --fail-fast; do (( I=$I+1 )); echo 
"Completed run: $I"; sleep 1; done`. But let's see if there is any surprise in 
jenkins.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595591034


##
raft/src/main/java/org/apache/kafka/raft/EpochState.java:
##
@@ -26,15 +27,16 @@ default Optional highWatermark() {
 }
 
 /**
- * Decide whether to grant a vote to a candidate, it is the responsibility 
of the caller to invoke
+ * Decide whether to grant a vote to a candidate.
+ *
+ * It is the responsibility of the caller to invoke

Review Comment:
   This is the 
[method](https://github.com/apache/kafka/pull/15859/files#diff-095b9e60b0227d41f24b49923afddbe813d5c5754cd0ef4801741f1d275319b6R347-R350).
   
   I changed the signature in this PR so I just changed the documentation to 
match the new signature. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16660: reduce the check interval to speedup DelegationTokenRequestsTest [kafka]

2024-05-09 Thread via GitHub


brandboat opened a new pull request, #15907:
URL: https://github.com/apache/kafka/pull/15907

   related to https://issues.apache.org/jira/browse/KAFKA-16660
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-05-09 Thread via GitHub


clolov commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1595590794


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -411,25 +394,32 @@ public void seek(final TopicPartition partition, final 
long offset) {
 
 shouldNotSeek.set(new AssertionError("Should not seek"));
 
+@SuppressWarnings("unchecked")
 final java.util.function.Consumer> resetter =
-EasyMock.mock(java.util.function.Consumer.class);
-resetter.accept(Collections.singleton(partition1));
-EasyMock.expectLastCall();
-EasyMock.replay(resetter);
+mock(java.util.function.Consumer.class);

Review Comment:
   Heya @chia7712 and thank you for the review and the good suggestions! 
Apologies for not responding sooner, I somehow missed it. The newest commits 
ought to address both of your suggestions  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-09 Thread Johnson Okorie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnson Okorie updated KAFKA-16692:
---
Description: 
We have a kafka cluster running on version 3.5.2 that we are upgrading to 
3.6.1. This cluster has a lot of clients with exactly one semantics enabled and 
hence creating transactions. As we replaced brokers with the new binaries, we 
observed lots of clients in the cluster experiencing the following error:
{code:java}
2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
transactionalId=] Got error produce response with correlation 
id 6402937 on topic-partition , retrying (2147483512 attempts 
left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before 
a response was received.{code}
On inspecting the broker, we saw the following errors on brokers still running 
Kafka version 3.5.2:

 
{code:java}
message:     
Closing socket for  because of error
exception_exception_class:    
org.apache.kafka.common.errors.InvalidRequestException
exception_exception_message:    
Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
enabled
exception_stacktrace:    
org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
{code}
On the new brokers running 3.6.1 we saw the following errors:

 
{code:java}
[AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
node 1043 with a network exception.{code}
 

I can also see this :
{code:java}
[AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 being 
disconnected (elapsed time since creation: 11ms, elapsed time since send: 4ms, 
request timeout: 3ms){code}
We started investigating this issue and digging through the changes in 3.6, we 
came across some changes introduced as part of KAFKA-14402 that we thought 
might lead to this behaviour. 

First we could see that _transaction.partition.verification.enable_ is enabled 
by default and enables a new code path that culminates in we sending version 4 
ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
[here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].

>From a 
>[discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
>on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
>possible as the following code paths should prevent version 4 
>ADD_PARTITIONS_TO_TXN requests being sent to other brokers:

[https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
 
[https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]

However, these requests are still sent to other brokers in our environment.

On further inspection of the code, I am wondering if the following code path 
could lead to this issue:

[https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]

In this scenario, we don't have any _NodeApiVersions_ available for the 
specified nodeId and potentially skipping the _latestUsableVersion_ check. I am 
wondering if it is possible that because _discoverBrokerVersions_ is set to 
_false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it skips 
fetching {_}NodeApiVersions{_}? I can see that we create the network client 
here:

[https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]

The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
that has _discoverBrokerVersions_ set to {_}false{_}. 

I was hoping I could get some assistance debugging this issue. Happy to provide 
any additional information needed.

 

 

 

  was:
We have a kafka cluster running on version 3.5.2 that we are upgrading to 
3.6.1. This cluster has a lot of clients with exactly one semantics enabled and 
hence creating transactions. As we replaced brokers with the new binaries, we 
observed lots of clients in the cluster experiencing the following error:
{code:java}
2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
transactionalId=] Got error produce response with correlation 
id 6402937 on topic-partition , retrying (2147483512 attempts 
left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before 
a response was received.{code}
On inspecting the broker, we saw the following errors on brokers still running 
Kafka version 3.5.2:

 
{code:java}
message:     
Closing socket for  because of error
exception_exception_class:    

[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-09 Thread Johnson Okorie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnson Okorie updated KAFKA-16692:
---
Description: 
We have a kafka cluster running on version 3.5.2 that we are upgrading to 
3.6.1. This cluster has a lot of clients with exactly one semantics enabled and 
hence creating transactions. As we replaced brokers with the new binaries, we 
observed lots of clients in the cluster experiencing the following error:
{code:java}
2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
transactionalId=] Got error produce response with correlation 
id 6402937 on topic-partition , retrying (2147483512 attempts 
left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before 
a response was received.{code}
On inspecting the broker, we saw the following errors on brokers still running 
Kafka version 3.5.2:

 
{code:java}
message:     
Closing socket for  because of error
exception_exception_class:    
org.apache.kafka.common.errors.InvalidRequestException
exception_exception_message:    
Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
enabled
exception_stacktrace:    
org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
{code}
On the new brokers running 3.6.1 we saw the following errors:

 
{code:java}
[AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
node 1043 with a network exception.{code}
 

I can also see this :
{code:java}
[AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 being 
disconnected (elapsed time since creation: 11ms, elapsed time since send: 4ms, 
request timeout: 3ms){code}
We started investigating this issue and digging through the changes in 3.6, we 
came across some changes introduced as part of KAFKA-14402 that we thought 
might lead to this behaviour. 

First we could see that _transaction.partition.verification.enable_ is enabled 
by default and enables a new code path that culminates in we sending version 4 
ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
[here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].

>From a 
>[discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
>on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
>possible as the following code paths should prevent version 4 
>ADD_PARTITIONS_TO_TXN requests being sent to other brokers:

[https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
 
[https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]

However, these requests are still sent to other brokers in our environment.

On further inspection of the code, I am wondering if the following code path 
could lead to this issue:

[https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]

In this scenario, we don't have any _NodeApiVersions_ available for the 
specified nodeId and potentially skipping _latestUsableVersion_ check. I am 
wondering if it is possible that because _discoverBrokerVersions_ is set to 
_false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it skips 
fetching {_}NodeApiVersions{_}? I can see that we create the network client 
here:

[https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]

The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
that has _discoverBrokerVersions_ set to {_}false{_}. 

I was hoping I could get some assistance debugging this issue. Happy to provide 
any additional information needed.

 

 

 

  was:
We have a kafka cluster running on version 3.5.2 that we are upgrading to 
3.6.1. This cluster has a lot of clients with exactly one semantics enabled and 
hence creating transactions. As we replaced brokers with the new binaries, we 
observed lots of clients in the cluster experiencing the following error:
{code:java}
2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
transactionalId=] Got error produce response with correlation 
id 6402937 on topic-partition , retrying (2147483512 attempts 
left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before 
a response was received.{code}
On inspecting the broker, we saw the following errors on brokers still running 
Kafka version 3.5.2:

 
{code:java}
message:     
Closing socket for  because of error
exception_exception_class:    

Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595585037


##
raft/src/main/java/org/apache/kafka/raft/ElectionState.java:
##
@@ -115,15 +166,51 @@ public boolean equals(Object o) {
 ElectionState that = (ElectionState) o;
 
 if (epoch != that.epoch) return false;
-if (!leaderIdOpt.equals(that.leaderIdOpt)) return false;
-return votedIdOpt.equals(that.votedIdOpt);
+if (!leaderId.equals(that.leaderId)) return false;
+if (!votedKey.equals(that.votedKey)) return false;
+
+return voters.equals(that.voters);
 }
 
 @Override
 public int hashCode() {
-int result = epoch;
-result = 31 * result + leaderIdOpt.hashCode();
-result = 31 * result + votedIdOpt.hashCode();
-return result;
+return Objects.hash(epoch, leaderId, votedKey, voters);
+}
+
+public static ElectionState withVotedCandidate(int epoch, ReplicaKey 
votedKey, Set voters) {
+if (votedKey.id() < 0) {
+throw new IllegalArgumentException("Illegal voted Id " + 
votedKey.id() + ": must be non-negative");
+}
+
+return new ElectionState(epoch, OptionalInt.empty(), 
Optional.of(votedKey), voters);
+}
+
+public static ElectionState withElectedLeader(int epoch, int leaderId, 
Set voters) {
+if (leaderId < 0) {
+throw new IllegalArgumentException("Illegal leader Id " + leaderId 
+ ": must be non-negative");
+}
+
+return new ElectionState(epoch, OptionalInt.of(leaderId), 
Optional.empty(), voters);
+}
+
+public static ElectionState withUnknownLeader(int epoch, Set 
voters) {
+return new ElectionState(epoch, OptionalInt.empty(), Optional.empty(), 
voters);
+}
+
+public static ElectionState fromQuorumStateData(QuorumStateData data) {
+Optional votedDirectoryId = 
data.votedDirectoryId().equals(noVotedDirectoryId) ?
+Optional.empty() :
+Optional.of(data.votedDirectoryId());
+
+Optional voterKey = data.votedId() == notVoted ?

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time

2024-05-09 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16695:
--

 Summary: Improve expired poll interval logging by showing exceeded 
time
 Key: KAFKA-16695
 URL: https://issues.apache.org/jira/browse/KAFKA-16695
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


When a consumer poll iteration takes longer than the max.poll.interval, the 
consumer logs a warn suggesting that the max.poll.interval config was exceeded, 
and pro-actively leaves the group. The log suggests to consider adjusting the 
max.poll.interval.config which should help in the cases of long processing 
times. We should consider adding the info of how much time the interval was 
exceeded, since it could be helpful in guiding the user to effectively adjust 
the config. This is done in other clients, that log this kind of messages in 
this situation:
{quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust 
max.poll.interval.ms for long-running message processing): leaving group{quote}
  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595581356


##
raft/src/main/java/org/apache/kafka/raft/ElectionState.java:
##
@@ -64,47 +62,100 @@ public boolean isLeader(int nodeId) {
 return leaderIdOrSentinel() == nodeId;
 }
 
-public boolean isVotedCandidate(int nodeId) {
-if (nodeId < 0)
-throw new IllegalArgumentException("Invalid negative nodeId: " + 
nodeId);
-return votedIdOpt.orElse(-1) == nodeId;
+/**
+ * Return if the replica has voted for the given candidate.
+ *
+ * A replica has voted for a candidate if all of the following are true:
+ * 1. the node's id and voted id match and
+ * 2. if the voted directory id is set, it matches the node's directory id
+ *
+ * @param nodeKey the id and directory id of the replica
+ * @return true when the arguments match, otherwise false
+ */
+public boolean isVotedCandidate(ReplicaKey nodeKey) {
+if (nodeKey.id() < 0) {
+throw new IllegalArgumentException("Invalid node key " + nodeKey);
+} else if (!votedKey.isPresent()) {
+return false;
+} else if (votedKey.get().id() != nodeKey.id()) {
+return false;
+} else if (!votedKey.get().directoryId().isPresent()) {
+// when the persisted voted uuid is not present assume that we 
voted for this candidate;
+// this happends when the kraft version is 0.

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595578267


##
raft/src/main/java/org/apache/kafka/raft/ElectionState.java:
##
@@ -16,46 +16,44 @@
  */
 package org.apache.kafka.raft;
 
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.raft.generated.QuorumStateData;
+import org.apache.kafka.raft.internals.ReplicaKey;
 
 /**
  * Encapsulate election state stored on disk after every state change.
  */
-public class ElectionState {
-public final int epoch;
-public final OptionalInt leaderIdOpt;
-public final OptionalInt votedIdOpt;
+final public class ElectionState {
+private static int unknownLeaderId = -1;

Review Comment:
   Yeah, I had this using capital letters and underscore but our style guide 
complained about the name pattern. E.g.
   ```
   [ant:checkstyle] [ERROR] 
kafka/raft/src/main/java/org/apache/kafka/raft/ElectionState.java:33:24: Name 
'UNKNOWN_LEADER_ID' must match pattern '^[a-z][a-zA-Z0-9]*$'. 
[StaticVariableName]
   ```
   
   I suspect that the style guideline is different if the field is `private`. I 
haven't investigated this in detail. Do you mind if I address this in a 
different PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595568368


##
raft/src/main/java/org/apache/kafka/raft/CandidateState.java:
##
@@ -51,14 +54,27 @@ public class CandidateState implements EpochState {
 protected CandidateState(
 Time time,
 int localId,
+Uuid localDirectoryId,

Review Comment:
   Yes. Thanks for catching this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-09 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1595568006


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -64,6 +64,43 @@ public Optional voterAddress(int voter, 
String listener) {
 .flatMap(voterNode -> voterNode.address(listener));
 }
 
+/**
+ * Returns if the node is a voter in the set of voters.
+ *
+ * If the voter set includes the directory id, the {@code nodeKey} 
directory id must match the
+ * directory id specified by the voter set.
+ *
+ * If the voter set doesn't include the directory id ({@code 
Optional.empty()}), a node is in
+ * the voter set as long as the node id matches. The directory id is not 
checked.
+ *
+ * @param nodeKey the node's id and directory id
+ * @return true if the node is a voter in the voter set, otherwise false
+ */
+public boolean isVoter(ReplicaKey nodeKey) {
+VoterNode node = voters.get(nodeKey.id());
+if (node != null) {
+if (node.voterKey().directoryId().isPresent()) {
+return 
node.voterKey().directoryId().equals(nodeKey.directoryId());
+} else {
+// configured voter set doesn't an uuid so it is a voter as 
long as the node id

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-09 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845004#comment-17845004
 ] 

Justine Olshan commented on KAFKA-16692:


I'm working on a test to recreate the issue, and then seeing if the proposed 
fix helps.

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping _latestUsableVersion_ check as 
> expected. I am wondering if it is possible that because 
> _discoverBrokerVersions_ is set to _false_ for the network client of the 
> {_}AddPartitionsToTxnManager{_}, it skips fetching {_}NodeApiVersions{_}? I 
> can see that we create the network client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
> that has _discoverBrokerVersions_ set to {_}false{_}. 
> I was hoping I could get some assistance debugging this issue. Happy to 
> provide any additional information needed.
>  
>  
>  



--
This 

  1   2   >