[jira] [Commented] (KAFKA-16616) refactor mergeWith in MetadataSnapshot

2024-05-14 Thread Cao Manh Dat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846515#comment-17846515
 ] 

Cao Manh Dat commented on KAFKA-16616:
--

Hi [~alyssahuang] , can I work on this item?

> refactor mergeWith in MetadataSnapshot
> --
>
> Key: KAFKA-16616
> URL: https://issues.apache.org/jira/browse/KAFKA-16616
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.7.0
>Reporter: Alyssa Huang
>Priority: Minor
>
> Right now we keep track of topic ids and partition metadata to add/update 
> separately in mergeWith (e.g. two maps passed as arguments). This means we 
> iterate over topic metadata twice which could be costly when we're dealing 
> with a large number of updates. 
> `updatePartitionLeadership` which calls `mergeWith` does something similarly 
> (generates map of topic ids to update in a loop separate from the list of 
> partition metadata to update) and should be refactored as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14588 [3/N] ConfigCommandTest rewritten in java [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on code in PR #15930:
URL: https://github.com/apache/kafka/pull/15930#discussion_r1601032507


##
core/src/test/java/kafka/admin/ConfigCommandUnitTest.java:
##
@@ -878,6 +886,486 @@ public void 
shouldNotDescribeUserScramCredentialsWithEntityDefaultUsingBootstrap
 verifyUserScramCredentialsNotDescribed(defaultUserOpt);
 }
 
+@Test
+public void shouldAddTopicConfigUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
+"--entity-name", "my-topic",
+"--entity-type", "topics",
+"--alter",
+"--add-config", "a=b,c=d"));
+
+KafkaZkClient zkClient = mock(KafkaZkClient.class);
+when(zkClient.getEntityConfigs(anyString(), 
anyString())).thenReturn(new Properties());
+
+ConfigCommand.alterConfigWithZk(null, createOpts, new 
AdminZkClient(zkClient, scala.None$.empty()) {
+@Override
+public void changeTopicConfig(String topic, Properties 
configChange) {
+assertEquals("my-topic", topic);
+assertEquals("b", configChange.get("a"));
+assertEquals("d", configChange.get("c"));
+}
+});
+}
+
+@Test

Review Comment:
   How about using `ValueSource`?
   
   ```java
   @Test
   @ValueSource(booleans = {true, false})
   public void shouldAlterTopicConfig(boolean file) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-14 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1601000794


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
 }
 }
 
+/**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+if (!member.useClassicProtocol()) {
+throw new UnknownMemberIdException(
+String.format("Member %s does not use the classic protocol.", 
member.memberId())
+);
+}
+}
+
+/**
+ * Validates if the generation id and the protocol type from the request 
match those of the consumer group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberThe ConsumerGroupMember.
+ * @param requestGenerationId   The generation id from the request.
+ * @param requestProtocolType   The protocol type from the request.
+ * @param requestProtocolName   The protocol name from the request.
+ */
+private void throwIfGenerationIdOrProtocolUnmatched(

Review Comment:
   nit: It may be better to split this one into two methods. One to validate 
the generation. Another one to validate the protocol type and name.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
 }
 }
 
+/**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+if (!member.useClassicProtocol()) {
+throw new UnknownMemberIdException(
+String.format("Member %s does not use the classic protocol.", 
member.memberId())
+);
+}
+}
+
+/**
+ * Validates if the generation id and the protocol type from the request 
match those of the consumer group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberThe ConsumerGroupMember.
+ * @param requestGenerationId   The generation id from the request.
+ * @param requestProtocolType   The protocol type from the request.
+ * @param requestProtocolName   The protocol name from the request.
+ */
+private void throwIfGenerationIdOrProtocolUnmatched(
+ConsumerGroup group,
+ConsumerGroupMember member,
+int requestGenerationId,
+String requestProtocolType,
+String requestProtocolName
+) {
+if (member.memberEpoch() != requestGenerationId) {
+throw Errors.ILLEGAL_GENERATION.exception(
+String.format("The request generation id %s is not equal to 
the group epoch %d from the consumer group %s.",

Review Comment:
   nit: `group epoch` -> `member epoch`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
 }
 }
 
+/**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+if (!member.useClassicProtocol()) {
+throw new UnknownMemberIdException(
+String.format("Member %s does not use the classic protocol.", 
member.memberId())
+);
+}
+}
+
+/**
+ * Validates if the generation id and the protocol type from the request 
match those of the consumer group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberThe ConsumerGroupMember.
+ * @param requestGenerationId   The generation id from the request.
+ * @param requestProtocolType   The protocol type from the request.
+ * @param requestProtocolName   The protocol name from the request.
+ */
+private void throwIfGenerationIdOrProtocolUnmatched(
+ConsumerGroup group,
+ConsumerGroupMember member,
+int requestGenerationId,
+String requestProtocolType,
+String requestProtocolName
+) {
+if (member.memberEpoch() != requestGenerationId) {
+throw Errors.ILLEGAL_GENERATION.exception(
+String.format("The request generation id %s is not equal to 
the group epoch %d from the consumer group %s.",
+requestGenerationId, group.groupEpoch(), group.groupId())

Review Comment:
   nit: `group.groupEpoch()` -> `member.memberEpoch()`.



##
group-coordina

[jira] [Commented] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error

2024-05-14 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846508#comment-17846508
 ] 

Luke Chen commented on KAFKA-16760:
---

[~soarez], I'm really sorry, I can't believe I didn't commit my change upto the 
branch yesterday. I just updated the branch, and it reliably failed in my env 
after 3 run. Please give it a try again. 
I'd like to know if this is the expected behavior, or it is a bug?

> alterReplicaLogDirs failed even if responded with none error
> 
>
> Key: KAFKA-16760
> URL: https://issues.apache.org/jira/browse/KAFKA-16760
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
>
> When firing alterLogDirRequest, it gets error NONE result. But actually, the 
> alterLogDir never happened with these errors:
> {code:java}
> [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 has an older epoch (0) than the current leader. Will await the new 
> LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaAlterLogDirsThread:66)
> [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70)
> {code}
> Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. 
> This can be reproduced in this 
> [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running 
> this test:
> {code:java}
> ./gradlew cleanTest storage:test --tests 
> org.apache.kafka.tiered.storage.integration.AlterLogDirTest
> {code}
> The complete logs can be found here: 
> https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on code in PR #15946:
URL: https://github.com/apache/kafka/pull/15946#discussion_r1600992789


##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -66,96 +65,66 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
 
 private final String baseDisplayName;
 private final ClusterConfig clusterConfig;
-private final AtomicReference clusterReference;
-private final AtomicReference zkReference;
 private final boolean isCombined;
 
 public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig 
clusterConfig, boolean isCombined) {
 this.baseDisplayName = baseDisplayName;
 this.clusterConfig = clusterConfig;
-this.clusterReference = new AtomicReference<>();
-this.zkReference = new AtomicReference<>();
 this.isCombined = isCombined;
 }
 
 @Override
 public String getDisplayName(int invocationIndex) {
 String clusterDesc = clusterConfig.nameTags().entrySet().stream()
-.map(Object::toString)
-.collect(Collectors.joining(", "));
+.map(Object::toString)

Review Comment:
   please avoid those unrelated changes. smaller is better



##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -252,7 +220,13 @@ public Admin createAdminClient(Properties configOverrides) 
{
 public void start() {

Review Comment:
   in this method we should always call `format` first. That is a big sugar to 
users



##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -171,39 +140,39 @@ public Optional controllerListenerName() {
 @Override
 public Collection controllerSocketServers() {
 return controllers()
-.map(ControllerServer::socketServer)
-.collect(Collectors.toList());
+.map(ControllerServer::socketServer)
+.collect(Collectors.toList());
 }
 
 @Override
 public SocketServer anyBrokerSocketServer() {
 return brokers()
-.map(BrokerServer::socketServer)
-.findFirst()
-.orElseThrow(() -> new RuntimeException("No broker 
SocketServers found"));
+.map(BrokerServer::socketServer)

Review Comment:
   ditto. please revert those changes.



##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -284,24 +258,51 @@ public void startBroker(int brokerId) {
 @Override
 public void waitForReadyBrokers() throws InterruptedException {
 try {
-clusterReference.get().waitForReadyBrokers();
+clusterTestKit.waitForReadyBrokers();
 } catch (ExecutionException e) {
 throw new AssertionError("Failed while waiting for brokers to 
become ready", e);
 }
 }
 
-private BrokerServer findBrokerOrThrow(int brokerId) {
-return 
Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
-.orElseThrow(() -> new IllegalArgumentException("Unknown 
brokerId " + brokerId));
-}
-
 public Stream brokers() {
-return clusterReference.get().brokers().values().stream();
+return clusterTestKit.brokers().values().stream();
 }
 
 public Stream controllers() {
-return clusterReference.get().controllers().values().stream();
+return clusterTestKit.controllers().values().stream();
 }
 
+public void format() throws Exception {

Review Comment:
   `format` and `buildAndFormatCluster` can be merged. for example:
   ```java
   public void format() {
   if (this.clusterTestKit == null) {
   try {
   KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(new TestKitNodes.Builder()
   
.setBootstrapMetadataVersion(clusterConfig.metadataVersion())
   .setCombined(isCombined)
   .setNumBrokerNodes(clusterConfig.numBrokers())
   
.setNumDisksPerBroker(clusterConfig.numDisksPerBroker())
   
.setPerServerProperties(clusterConfig.perServerOverrideProperties())
   
.setNumControllerNodes(clusterConfig.numControllers()).build());
   
   if (Boolean.parseBoolean(clusterConfig.serverProperties()
   .getOrDefault("zookeeper.metadata.migration.enable", 
"false"))) {
   this.embeddedZookeeper = new EmbeddedZookeeper();
   builder.setConfigProp("zookeeper.connect", 
String.format("localhost:%d", embeddedZookeeper.port()));
   }
   
   // Copy properties into the Test

Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on code in PR #15933:
URL: https://github.com/apache/kafka/pull/15933#discussion_r1600990933


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java:
##
@@ -960,6 +666,14 @@ private void expectConvertWriteRead(final String 
configKey, final Schema valueSc
 });
 }
 
+private void expectConvertWriteAndRead(final String configKey, final 
Schema valueSchema, final byte[] serialized,

Review Comment:
   please remove this unused function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix flaky testRecordThreadIdleRatioTwoThreads test [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on PR #15937:
URL: https://github.com/apache/kafka/pull/15937#issuecomment-2111647759

   @jeffkbkim It seems the test case is still a bit flaky. see 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15933/2/testReport/org.apache.kafka.coordinator.group.runtime/MultiThreadedEventProcessorTest/Build___JDK_8_and_Scala_2_12___testRecordThreadIdleRatio__/
 
   
   Do you have free time to dig in it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rename `Record` to `CoordinatorRecord` [kafka]

2024-05-14 Thread via GitHub


chia7712 merged PR #15949:
URL: https://github.com/apache/kafka/pull/15949


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16763: Upgrade to scala 2.12.19 and scala 2.13.14 [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on PR #15958:
URL: https://github.com/apache/kafka/pull/15958#issuecomment-2111632709

   ```
   [2024-05-15T01:51:49.255Z] > Task :core:compileScala
   
   [2024-05-15T01:51:49.255Z] [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-15958/core/src/main/scala/kafka/controller/KafkaController.scala:1202:18:
 method setOrCreatePartitionReassignment in class KafkaZkClient is deprecated
   
   [2024-05-15T01:51:50.590Z] [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-15958/core/src/main/scala/kafka/zk/ZkData.scala:518:14:
 class LegacyPartitionAssignment in object ReassignPartitionsZNode is deprecated
   
   [2024-05-15T01:51:50.590Z] [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-15958/core/src/main/scala/kafka/zk/ZkData.scala:524:24:
 class LegacyPartitionAssignment in object ReassignPartitionsZNode is deprecated
   
   [2024-05-15T01:51:50.590Z] [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-15958/core/src/main/scala/kafka/zk/ZkData.scala:533:23:
 class LegacyPartitionAssignment in object ReassignPartitionsZNode is deprecated
   
   [2024-05-15T01:51:50.590Z] [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-15958/core/src/main/scala/kafka/zk/ZkData.scala:533:49:
 class LegacyPartitionAssignment in object ReassignPartitionsZNode is deprecated
   
   ```
   
   please file the build error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE… [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on PR #15917:
URL: https://github.com/apache/kafka/pull/15917#issuecomment-2111631318

   @showuon Please take a look if you have free time. I'd like to migrate all 
tests of storage to new test infra after this PR gets merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16686: Wait for given offset in TopicBasedRemoteLogMetadataManagerTest [kafka]

2024-05-14 Thread via GitHub


chia7712 merged PR #15885:
URL: https://github.com/apache/kafka/pull/15885


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix TargetAssignmentBuilderBenchmark [kafka]

2024-05-14 Thread via GitHub


chia7712 merged PR #15950:
URL: https://github.com/apache/kafka/pull/15950


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix warnings in streams javadoc [kafka]

2024-05-14 Thread via GitHub


chia7712 merged PR #15955:
URL: https://github.com/apache/kafka/pull/15955


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14

2024-05-14 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16763:
--

Assignee: 黃竣陽  (was: Chia-Ping Tsai)

> Upgrade to scala 2.12.19 and scala 2.13.14
> --
>
> Key: KAFKA-16763
> URL: https://issues.apache.org/jira/browse/KAFKA-16763
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Minor
>
> scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19)
>  
> scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16671) Revisit SessionedProtocolIntegrationTest.ensureInternalEndpointIsSecured

2024-05-14 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16671.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Revisit SessionedProtocolIntegrationTest.ensureInternalEndpointIsSecured
> 
>
> Key: KAFKA-16671
> URL: https://issues.apache.org/jira/browse/KAFKA-16671
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
> Fix For: 3.8.0
>
>
> loop 1000times on my local, and all pass. Let's enable the test to see what 
> happens in our CI



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16671: enable test for ensureInternalEndpointIsSecured [kafka]

2024-05-14 Thread via GitHub


chia7712 merged PR #15868:
URL: https://github.com/apache/kafka/pull/15868


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]

2024-05-14 Thread via GitHub


appchemist opened a new pull request, #15961:
URL: https://github.com/apache/kafka/pull/15961

   - If any non-retriable exceptions were encountered during metadata update, 
clear and throw the exception in new consumer.poll. (like legacy consumer)
  - If an invalid topic is discovered in metadata in new consumer.poll, 
throw InvalidTopicException
   - Enable `KafkaConsumerTest.testSubscriptionOnInvalidTopic` test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-14 Thread appchemist (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846483#comment-17846483
 ] 

appchemist commented on KAFKA-16764:


Hi [~lianetm] 

 

I would like to take this issue

> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
> URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15050: format the prompts in the quickstart [kafka]

2024-05-14 Thread via GitHub


joobisb commented on PR #13862:
URL: https://github.com/apache/kafka/pull/13862#issuecomment-2111521992

   @tombentley @flavray 
   
   I have addressed all the comments, can we merge this ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update leaderAndEpoch before initializing metadata publishers [kafka]

2024-05-14 Thread via GitHub


github-actions[bot] commented on PR #15366:
URL: https://github.com/apache/kafka/pull/15366#issuecomment-2111516055

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16763: Upgrade to scala 2.12.19 and scala 2.13.14 [kafka]

2024-05-14 Thread via GitHub


m1a2st commented on PR #15958:
URL: https://github.com/apache/kafka/pull/15958#issuecomment-2111504233

   @chia7712, This PR please take a look, Thank you. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [DO NOT MERGE] KIP-924 rack info API option 1: add #taskIdToPartitionsRackIds and PartitionRackIds [kafka]

2024-05-14 Thread via GitHub


ableegoldman opened a new pull request, #15960:
URL: https://github.com/apache/kafka/pull/15960

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [DO NOT MERGE] KIP-924 rack info API option 2: introduce TaskInfo with complete task metadata [kafka]

2024-05-14 Thread via GitHub


ableegoldman opened a new pull request, #15959:
URL: https://github.com/apache/kafka/pull/15959

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16765) NioEchoServer leaks accepted SocketChannel instances due to race condition

2024-05-14 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846475#comment-17846475
 ] 

Greg Harris commented on KAFKA-16765:
-

This is also a bug in EchoServer: 
[https://github.com/apache/kafka/blob/cb968845ecb3cb0982182d9dd437ecf652fe38d3/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java#L76-L81]
 and ServerShutdownTest: 
[https://github.com/apache/kafka/blob/cb968845ecb3cb0982182d9dd437ecf652fe38d3/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala#L274-L275]
 except those don't require a race condition to happen.

> NioEchoServer leaks accepted SocketChannel instances due to race condition
> --
>
> Key: KAFKA-16765
> URL: https://issues.apache.org/jira/browse/KAFKA-16765
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Priority: Minor
>
> The NioEchoServer has an AcceptorThread that calls accept() to open new 
> SocketChannel instances and insert them into the `newChannels` List, and a 
> main thread that drains the `newChannels` List and moves them to the 
> `socketChannels` List.
> During shutdown, the serverSocketChannel is closed, which causes both threads 
> to exit their while loops. It is possible for the NioEchoServer main thread 
> to sense the serverSocketChannel close and terminate before the Acceptor 
> thread does, and for the Acceptor thread to put a SocketChannel in 
> `newChannels` before terminating. This instance is never closed by either 
> thread, because it is never moved to `socketChannels`.
> A precise execution order that has this leak is:
> 1. NioEchoServer thread locks `newChannels`.
> 2. Acceptor thread accept() completes, and the SocketChannel is created
> 3. Acceptor thread blocks waiting for the `newChannels` lock
> 4. NioEchoServer thread releases the `newChannels` lock and does some 
> processing
> 5. NioEchoServer#close() is called, which closes the serverSocketChannel
> 6. NioEchoServer thread checks serverSocketChannel.isOpen() and then 
> terminates
> 7. Acceptor thread acquires the `newChannels` lock and adds the SocketChannel 
> to `newChannels`.
> 8. Acceptor thread checks serverSocketChannel.isOpen() and then terminates.
> 9. NioEchoServer#close() stops blocking now that both other threads have 
> terminated.
> The end result is that the leaked socket is left open in the `newChannels` 
> list at the end of close(), which is incorrect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16768) SocketServer leaks accepted SocketChannel instances due to race condition

2024-05-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16768:

Description: 
The SocketServer has threads for Acceptors and Processors. These threads 
communicate via Processor#accept/Processor#configureNewConnections and the 
`newConnections` queue.

During shutdown, the Acceptor and Processors are each stopped by setting 
shouldRun to false, and then shutdown proceeds asynchronously in all instances 
together. This leads to a race condition where an Acceptor accepts a 
SocketChannel and queues it to a Processor, but that Processor instance has 
already started shutting down and has already drained the newConnections queue.

KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely 
different implementation but has the same flaw.

An example execution order that includes this leak:
1. Acceptor#accept() is called, and a new SocketChannel is accepted.
2. Acceptor#assignNewConnection() begins
3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor 
and attached Processor instances
4. Processor#run() checks the shouldRun variable, and exits the loop
5. Processor#closeAll() executes, and drains the `newConnections` variable
6. Processor#run() returns and the Processor thread terminates
7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the 
SocketChannel to `newConnections`
8. Acceptor#assignNewConnection() returns
9. Acceptor#run() checks the shouldRun variable and exits the loop, and the 
Acceptor thread terminates.
10. Acceptor#close() joins all of the terminated threads, and returns

At the end of this sequence, there are still open SocketChannel instances in 
newConnections, which are then considered leaked.

  was:
The SocketServer has threads for Acceptors and Processors. These threads 
communicate via Processor#accept/Processor#configureNewConnections and the 
`newConnections` queue.

During shutdown, the Acceptor and Processors are each stopped by setting 
shouldRun to false, and then shutdown proceeds asynchronously in all instances 
together. This leads to a race condition where an Acceptor accepts a 
SocketChannel and queues it to a Processor, but that Processor instance has 
already started shutting down and has already drained the newConnections queue.


> SocketServer leaks accepted SocketChannel instances due to race condition
> -
>
> Key: KAFKA-16768
> URL: https://issues.apache.org/jira/browse/KAFKA-16768
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Priority: Major
>
> The SocketServer has threads for Acceptors and Processors. These threads 
> communicate via Processor#accept/Processor#configureNewConnections and the 
> `newConnections` queue.
> During shutdown, the Acceptor and Processors are each stopped by setting 
> shouldRun to false, and then shutdown proceeds asynchronously in all 
> instances together. This leads to a race condition where an Acceptor accepts 
> a SocketChannel and queues it to a Processor, but that Processor instance has 
> already started shutting down and has already drained the newConnections 
> queue.
> KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely 
> different implementation but has the same flaw.
> An example execution order that includes this leak:
> 1. Acceptor#accept() is called, and a new SocketChannel is accepted.
> 2. Acceptor#assignNewConnection() begins
> 3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor 
> and attached Processor instances
> 4. Processor#run() checks the shouldRun variable, and exits the loop
> 5. Processor#closeAll() executes, and drains the `newConnections` variable
> 6. Processor#run() returns and the Processor thread terminates
> 7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the 
> SocketChannel to `newConnections`
> 8. Acceptor#assignNewConnection() returns
> 9. Acceptor#run() checks the shouldRun variable and exits the loop, and the 
> Acceptor thread terminates.
> 10. Acceptor#close() joins all of the terminated threads, and returns
> At the end of this sequence, there are still open SocketChannel instances in 
> newConnections, which are then considered leaked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16768) SocketServer leaks accepted SocketChannel instances due to race condition

2024-05-14 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16768:
---

 Summary: SocketServer leaks accepted SocketChannel instances due 
to race condition
 Key: KAFKA-16768
 URL: https://issues.apache.org/jira/browse/KAFKA-16768
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.8.0
Reporter: Greg Harris


The SocketServer has threads for Acceptors and Processors. These threads 
communicate via Processor#accept/Processor#configureNewConnections and the 
`newConnections` queue.

During shutdown, the Acceptor and Processors are each stopped by setting 
shouldRun to false, and then shutdown proceeds asynchronously in all instances 
together. This leads to a race condition where an Acceptor accepts a 
SocketChannel and queues it to a Processor, but that Processor instance has 
already started shutting down and has already drained the newConnections queue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14

2024-05-14 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846471#comment-17846471
 ] 

黃竣陽 commented on KAFKA-16763:
-

I will handle this issue.

> Upgrade to scala 2.12.19 and scala 2.13.14
> --
>
> Key: KAFKA-16763
> URL: https://issues.apache.org/jira/browse/KAFKA-16763
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19)
>  
> scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15045: (KIP-924 pt. 3) Implement KafkaStreamsAssignment [kafka]

2024-05-14 Thread via GitHub


ableegoldman commented on PR #15944:
URL: https://github.com/apache/kafka/pull/15944#issuecomment-2111377214

   Merged to trunk. I'll update the KIP and send out a notice about the change 
from interface to class and the new APIs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 3) Implement KafkaStreamsAssignment [kafka]

2024-05-14 Thread via GitHub


ableegoldman merged PR #15944:
URL: https://github.com/apache/kafka/pull/15944


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 3) Implement KafkaStreamsAssignment [kafka]

2024-05-14 Thread via GitHub


ableegoldman commented on PR #15944:
URL: https://github.com/apache/kafka/pull/15944#issuecomment-2111376581

   The test env seems pretty unstable right now but we have at least one clean 
build for each java version if you look at the two latest runs. All test 
failures are unrelated as well. Seems safe to merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState [kafka]

2024-05-14 Thread via GitHub


ableegoldman commented on PR #15920:
URL: https://github.com/apache/kafka/pull/15920#issuecomment-2111374003

   Merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState [kafka]

2024-05-14 Thread via GitHub


ableegoldman commented on PR #15920:
URL: https://github.com/apache/kafka/pull/15920#issuecomment-2111373393

   The test env seems pretty unstable right now but we have at least one clean 
build for each java version if you look at the two latest runs. All test 
failures are unrelated as well. Seems safe to merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState [kafka]

2024-05-14 Thread via GitHub


ableegoldman merged PR #15920:
URL: https://github.com/apache/kafka/pull/15920


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16361) Rack aware sticky assignor minQuota violations

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846467#comment-17846467
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-16361 at 5/15/24 12:24 AM:
--

Thanks, I think it's safe to say this is related to the rack-aware assignment 
code that was added in 3.5. Probably the same issue that [~flashmouse] found in 
KAFKA-15170

Fortunately I just merged that fix and cherrypicked it back to 3.7, so the 
patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix 
release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is 
just a day from KIP freeze which means if all goes well, it will be available 
in a little over a month.

If you need an immediate resolution in the meantime then you have two options:

1) disable rack-awareness which will effectively make the assignor just skip 
over the buggy code

2) if you can build from source and don't require an official release, just 
cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch 
with whatever version you'd like to use and compile it yourself. I wouldn't 
recommend building directly from trunk for a production environment since that 
contains untested code, but you can at least run your test again using the 
latest trunk build if you want to make sure that it fixes the issue you're 
experiencing. I'm pretty confident it will though


was (Author: ableegoldman):
Thanks, I think it's safe to say this is related to the rack-aware assignment 
code that was added in 3.5. Probably the same issue that [~flashmouse] found in 
[KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170]

 

Fortunately I just merged that fix and cherrypicked it back to 3.7, so the 
patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix 
release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is 
just a day from KIP freeze which means if all goes well, it will be available 
in a little over a month.

 

If you need an immediate resolution in the meantime then you have two options:

1) disable rack-awareness which will effectively make the assignor just skip 
over the buggy code

2) if you can build from source and don't require an official release, just 
cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch 
with whatever version you'd like to use and compile it yourself. I wouldn't 
recommend building directly from trunk for a production environment since that 
contains untested code, but you can at least run your test again using the 
latest trunk build if you want to make sure that it fixes the issue you're 
experiencing. I'm pretty confident it will though

> Rack aware sticky assignor minQuota violations
> --
>
> Key: KAFKA-16361
> URL: https://issues.apache.org/jira/browse/KAFKA-16361
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.1, 3.7.0, 3.6.1
>Reporter: Luke D
>Priority: Major
> Attachments: illegalstateexception.log
>
>
> In some low topic replication scenarios the rack aware assignment in the 
> StickyAssignor fails to balance consumers to its own expectations and throws 
> an IllegalStateException, commonly crashing the application (depending on 
> application implementation). While uncommon the error is deterministic, and 
> so persists until the replication state changes. 
>  
> We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it 
> locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely 
> would also be reproducible there) 
>  
> Here is the error and stack from our test case against 3.7.0
> {code:java}
> We haven't reached the expected number of members with more than the minQuota 
> partitions, but no more partitions to be assigned
> java.lang.IllegalStateException: We haven't reached the expected number of 
> members with more than the minQuota partitions, but no more partitions to be 
> assigned
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
>  {code}
> Here is a specific test case from 3.7.0 that fails when passed to 
> StickyAssignor.assign:
> {code:java}
> Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 
> (id: 3 rack: rack

[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846467#comment-17846467
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16361:


Thanks, I think it's safe to say this is related to the rack-aware assignment 
code that was added in 3.5. Probably the same issue that [~flashmouse] found in 
[KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170]

 

Fortunately I just merged that fix and cherrypicked it back to 3.7, so the 
patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix 
release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is 
just a day from KIP freeze which means if all goes well, it will be available 
in a little over a month.

 

If you need an immediate resolution in the meantime then you have two options:

1) disable rack-awareness which will effectively make the assignor just skip 
over the buggy code

2) if you can build from source and don't require an official release, just 
cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch 
with whatever version you'd like to use and compile it yourself. I wouldn't 
recommend building directly from trunk for a production environment since that 
contains untested code, but you can at least run your test again using the 
latest trunk build if you want to make sure that it fixes the issue you're 
experiencing. I'm pretty confident it will though

> Rack aware sticky assignor minQuota violations
> --
>
> Key: KAFKA-16361
> URL: https://issues.apache.org/jira/browse/KAFKA-16361
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.1, 3.7.0, 3.6.1
>Reporter: Luke D
>Priority: Major
> Attachments: illegalstateexception.log
>
>
> In some low topic replication scenarios the rack aware assignment in the 
> StickyAssignor fails to balance consumers to its own expectations and throws 
> an IllegalStateException, commonly crashing the application (depending on 
> application implementation). While uncommon the error is deterministic, and 
> so persists until the replication state changes. 
>  
> We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it 
> locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely 
> would also be reproducible there) 
>  
> Here is the error and stack from our test case against 3.7.0
> {code:java}
> We haven't reached the expected number of members with more than the minQuota 
> partitions, but no more partitions to be assigned
> java.lang.IllegalStateException: We haven't reached the expected number of 
> members with more than the minQuota partitions, but no more partitions to be 
> assigned
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
>  {code}
> Here is a specific test case from 3.7.0 that fails when passed to 
> StickyAssignor.assign:
> {code:java}
> Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 
> (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: 
> rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader 
> = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = 
> 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = 
> 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = 

[jira] [Updated] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15170:
---
Fix Version/s: 3.8.0
   3.7.1

> CooperativeStickyAssignor cannot adjust assignment correctly
> 
>
> Key: KAFKA-15170
> URL: https://issues.apache.org/jira/browse/KAFKA-15170
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: li xiangyuan
>Assignee: li xiangyuan
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment 
> when all consumers in group subscribe the same topic list, but it couldn't 
> add all partitions move owner to another consumer to 
> ``partitionsWithMultiplePreviousOwners``.
>  
> the reason is in function assignOwnedPartitions hasn't add partitions that 
> rack-mismatch with prev owner to allRevokedPartitions, then partition only in 
> this list would add to partitionsWithMultiplePreviousOwners.
>  
> In Cooperative Rebalance, partitions have changed owner must be removed from 
> final assignment or will lead to incorrect consume behavior, I have already 
> raise a pr, please take a look, thx
>  
> [https://github.com/apache/kafka/pull/13965]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-15170.

Resolution: Fixed

> CooperativeStickyAssignor cannot adjust assignment correctly
> 
>
> Key: KAFKA-15170
> URL: https://issues.apache.org/jira/browse/KAFKA-15170
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: li xiangyuan
>Assignee: li xiangyuan
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment 
> when all consumers in group subscribe the same topic list, but it couldn't 
> add all partitions move owner to another consumer to 
> ``partitionsWithMultiplePreviousOwners``.
>  
> the reason is in function assignOwnedPartitions hasn't add partitions that 
> rack-mismatch with prev owner to allRevokedPartitions, then partition only in 
> this list would add to partitionsWithMultiplePreviousOwners.
>  
> In Cooperative Rebalance, partitions have changed owner must be removed from 
> final assignment or will lead to incorrect consume behavior, I have already 
> raise a pr, please take a look, thx
>  
> [https://github.com/apache/kafka/pull/13965]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15170: CooperativeStickyAssignor may fail adjust assignment [kafka]

2024-05-14 Thread via GitHub


ableegoldman commented on PR #13965:
URL: https://github.com/apache/kafka/pull/13965#issuecomment-2111359637

   Merged to trunk and cherrypicked back to 3.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16277: AbstractStickyAssignor - Sort owned TopicPartitions by partition when reassigning [kafka]

2024-05-14 Thread via GitHub


ableegoldman commented on PR #15416:
URL: https://github.com/apache/kafka/pull/15416#issuecomment-2111358899

   FYI I cherrypicked this back to 3.7 while cherrypicking back another sticky 
assignor fix. Should be in the 3.7.1 release


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16277:
---
Fix Version/s: 3.7.1

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Assignee: Cameron Redpath
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15170: CooperativeStickyAssignor may fail adjust assignment [kafka]

2024-05-14 Thread via GitHub


ableegoldman merged PR #13965:
URL: https://github.com/apache/kafka/pull/13965


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15170: CooperativeStickyAssignor may fail adjust assignment [kafka]

2024-05-14 Thread via GitHub


ableegoldman commented on PR #13965:
URL: https://github.com/apache/kafka/pull/13965#issuecomment-2111341726

   Test failures are unrelated, merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-14 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1600781735


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##


Review Comment:
   I added a test to `AsyncKafkaConsumerTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-14 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1600781636


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##


Review Comment:
   Oh, the code definitely has smells! 😉
   
   I added a test to `ConsumerNetworkThread`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] improve log description of QuorumController [kafka]

2024-05-14 Thread via GitHub


chickenchickenlove commented on code in PR #15926:
URL: https://github.com/apache/kafka/pull/15926#discussion_r160061


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -165,6 +165,8 @@ static ControllerResult recordsForNonEmptyLog(
 throw new RuntimeException("Should not have ZK 
migrations enabled on a cluster that was " +
 "created in KRaft mode.");
 }
+logMessageBuilder
+.append("since this is a de-novo KRaft cluster.");

Review Comment:
   This is better 👍 
   I make a new commit to apply your comments.
   Please take another look, when you have free time 🙇‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]

2024-05-14 Thread via GitHub


gaurav-narula commented on code in PR #15945:
URL: https://github.com/apache/kafka/pull/15945#discussion_r1600768789


##
metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Timeout(value = 40)
+public class BrokerRegistrationTrackerTest {
+static final Uuid INCARNATION_ID = 
Uuid.fromString("jyjLbk31Tpa53pFrU9Y-Ng");
+
+static final Uuid A = Uuid.fromString("Ahw3vXfnThqeZbb7HD1w6Q");
+
+static final Uuid B = Uuid.fromString("BjOacT0OTNqIvUWIlKhahg");
+
+static final Uuid C = Uuid.fromString("CVHi_iv2Rvy5_1rtPdasfg");
+
+static class BrokerRegistrationTrackerTestContext {
+AtomicInteger numCalls = new AtomicInteger(0);
+BrokerRegistrationTracker tracker = new BrokerRegistrationTracker(1,
+Arrays.asList(B, A), () -> numCalls.incrementAndGet());
+
+MetadataImage image = MetadataImage.EMPTY;
+
+void onMetadataUpdate(MetadataDelta delta) {
+MetadataProvenance provenance = new MetadataProvenance(0, 0, 0);
+image = delta.apply(provenance);
+LogDeltaManifest manifest = new LogDeltaManifest.Builder().
+provenance(provenance).
+leaderAndEpoch(LeaderAndEpoch.UNKNOWN).
+numBatches(1).
+elapsedNs(1).
+numBytes(1).
+build();
+tracker.onMetadataUpdate(delta, image, manifest);
+}
+
+MetadataDelta newDelta() {
+return new MetadataDelta.Builder().
+setImage(image).
+build();
+}
+}
+
+@Test
+public void testTrackerName() {
+BrokerRegistrationTrackerTestContext ctx  = new 
BrokerRegistrationTrackerTestContext();
+assertEquals("BrokerRegistrationTracker(id=1)", ctx.tracker.name());
+}
+
+@Test
+public void testMetadataVersionUpdateWithoutRegistrationDoesNothing() {
+BrokerRegistrationTrackerTestContext ctx  = new 
BrokerRegistrationTrackerTestContext();
+MetadataDelta delta = ctx.newDelta();
+delta.replay(new FeatureLevelRecord().
+setName(MetadataVersion.FEATURE_NAME).
+setFeatureLevel(MetadataVersion.IBP_3_7_IV2.featureLevel()));
+ctx.onMetadataUpdate(delta);
+assertEquals(0, ctx.numCalls.get());
+}
+
+@Test
+public void testBrokerUpdateWithoutNewMvDoesNothing() {
+BrokerRegistrationTrackerTestContext ctx  = new 
BrokerRegistrationTrackerTestContext();
+MetadataDelta delta = ctx.newDelta();
+delta.replay(new RegisterBrokerRecord().
+setBrokerId(1).
+setIncarnationId(INCARNATION_ID).
+setLogDirs(Arrays.asList(A, B, C)));
+ctx.onMetadataUpdate(delta);
+assertEquals(0, ctx.numCalls.get());
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testBrokerUpdateWithNewMv(boolean jbodMv) {
+BrokerRegistrationTrackerTestContext ctx  = new 
BrokerRegistrationTrackerTestContext();
+MetadataDelta delta = ctx.newDelta();
+delta.replay(new RegisterBrokerRecord().
+setBrokerId(1).
+setIncarnationId(INCARNATION_ID).
+setLogDirs(Arrays.asList()));
+delta.replay(new Featur

Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-14 Thread via GitHub


junrao commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1600483273


##
clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class GzipCompressionTest {
+
+@Test
+public void testCompressionDecompression() throws IOException {
+GzipCompression.Builder builder = Compression.gzip();
+byte[] data = "data".getBytes(StandardCharsets.UTF_8);

Review Comment:
   Should we test sth bigger like more than 512 bytes so that it covers the 
out.flush() logic?



##
clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class GzipCompressionTest {
+
+@Test
+public void testCompressionDecompression() throws IOException {
+GzipCompression.Builder builder = Compression.gzip();
+byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
+for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, 
GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) {
+GzipCompression compression = builder.level(level).build();
+ByteBufferOutputStream bufferStream = new 
ByteBufferOutputStream(4);
+try (OutputStream out = 
compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(data);
+out.flush();
+}
+bufferStream.buffer().flip();
+
+try (InputStream inputStream = 
compression.wrapForInput(bufferStream.buffer(), magic, 
BufferSupplier.create())) {
+byte[] result = new byte[data.length];
+int read = inputStream.read(result);
+assertEquals(data.length, read);
+assertArrayEquals(data, result);
+}
+}
+}
+

[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16764:
--
Fix Version/s: 3.8.0

> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
> URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16767) KRaft should track HWM outside the log layer

2024-05-14 Thread Jira
José Armando García Sancio created KAFKA-16767:
--

 Summary: KRaft should track HWM outside the log layer
 Key: KAFKA-16767
 URL: https://issues.apache.org/jira/browse/KAFKA-16767
 Project: Kafka
  Issue Type: Improvement
  Components: kraft
Reporter: José Armando García Sancio


The current implementation of KRaft tracks the HWM using the log layer 
implementation. The log layer has an invariant where the HWM <= LEO. This mean 
that the log layer always sets the HWM to the minimum of HWM and LEO.

This has the side-effect of the local KRaft reporting a HWM that is much 
smaller than the leader's HWM when the replica start with an empty log. E.g. a 
new broker or the kafka-metadata-shell.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16637) AsyncKafkaConsumer removes offset fetch responses from cache too aggressively

2024-05-14 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846446#comment-17846446
 ] 

Lianet Magrans commented on KAFKA-16637:


Hey [~chickenchickenlove], sorry I had missed your last question. The new group 
rebalance protocol from KIP-848 is supported in KRaft mode only.

> AsyncKafkaConsumer removes offset fetch responses from cache too aggressively
> -
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16766:
---
Description: 
If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the 
same timeout message in the LegacyConsumer (defined 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
 but do not have a clear message in the Async, so we should fix them all 3.

With the fix, we should write tests for each func, like the ones defined for 
the Legacy Consumer 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]).
 Note that we would need different tests, added to AsyncKafkaConsumerTest, 
given that the async consumer issues a FindCoordinator request in this case, 
but the AsyncConsumer does, so it does not account for that when matching 
requests/responses in the current tests.

  was:
If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

After the fix, we should be able to write a test like the 
[testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246]
 that exist for the legacy consumer. Note that we would need a different test 
given that the legacy consumer does not issue a FindCoordinator request in this 
case but the AsyncConsumer does, so the test would have to account for that 
when matching requests/responses.


> New consumer offsetsForTimes timeout exception does not have the proper 
> message
> ---
>
> Key: KAFKA-16766
> URL: https://issues.apache.org/jira/browse/KAFKA-16766
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
> will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
> with the following as message: "java.util.concurrent.TimeoutException". 
> We should provide a clearer message, and I would even say we keep the same 
> message that the LegacyConsumer shows in this case, ex: "Failed to get 
> offsets by times in 6ms".
> To fix this we should consider catching the timeout exception in the consumer 
> when offsetsForTimes result times out 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
>  and propagate it with the message specific to offsetsForTimes.
> Same situation exists for beginningOffsets and endOffsets. All 3 funcs show 
> the same timeout message in the LegacyConsumer (defined 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
>  but do not have a clear message in the Async, so we should fix them all 3.
> With the fix, we should write

[jira] [Created] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message

2024-05-14 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16766:
--

 Summary: New consumer offsetsForTimes timeout exception does not 
have the proper message
 Key: KAFKA-16766
 URL: https://issues.apache.org/jira/browse/KAFKA-16766
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans
 Fix For: 3.8.0


If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

After the fix, we should be able to write a test like the 
[testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246]
 that exist for the legacy consumer. Note that we would need a different test 
given that the legacy consumer does not issue a FindCoordinator request in this 
case but the AsyncConsumer does, so the test would have to account for that 
when matching requests/responses.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16765) NioEchoServer leaks accepted SocketChannel instances due to race condition

2024-05-14 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16765:
---

 Summary: NioEchoServer leaks accepted SocketChannel instances due 
to race condition
 Key: KAFKA-16765
 URL: https://issues.apache.org/jira/browse/KAFKA-16765
 Project: Kafka
  Issue Type: Bug
  Components: core, unit tests
Affects Versions: 3.8.0
Reporter: Greg Harris


The NioEchoServer has an AcceptorThread that calls accept() to open new 
SocketChannel instances and insert them into the `newChannels` List, and a main 
thread that drains the `newChannels` List and moves them to the 
`socketChannels` List.

During shutdown, the serverSocketChannel is closed, which causes both threads 
to exit their while loops. It is possible for the NioEchoServer main thread to 
sense the serverSocketChannel close and terminate before the Acceptor thread 
does, and for the Acceptor thread to put a SocketChannel in `newChannels` 
before terminating. This instance is never closed by either thread, because it 
is never moved to `socketChannels`.

A precise execution order that has this leak is:
1. NioEchoServer thread locks `newChannels`.
2. Acceptor thread accept() completes, and the SocketChannel is created
3. Acceptor thread blocks waiting for the `newChannels` lock
4. NioEchoServer thread releases the `newChannels` lock and does some processing
5. NioEchoServer#close() is called, which closes the serverSocketChannel
6. NioEchoServer thread checks serverSocketChannel.isOpen() and then terminates
7. Acceptor thread acquires the `newChannels` lock and adds the SocketChannel 
to `newChannels`.
8. Acceptor thread checks serverSocketChannel.isOpen() and then terminates.
9. NioEchoServer#close() stops blocking now that both other threads have 
terminated.

The end result is that the leaked socket is left open in the `newChannels` list 
at the end of close(), which is incorrect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16764:
---
Description: 
A call to consumer.poll should throw InvalidTopicException if an invalid topic 
is discovered in metadata. This can be easily reproduced by calling 
subscribe("invalid topic") and then poll, for example.The new consumer does not 
throw the expected InvalidTopicException like the LegacyKafkaConsumer does. 

The legacy consumer achieves this by checking for metadata exceptions on every 
iteration of the ConsumerNetworkClient (see 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])

This is probably what makes that 
[testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
 fails for the new consumer. Once this bug is fixed, we should be able to 
enable that test for the new consumer.

  was:
A call to consumer.poll should throw InvalidTopicException if an invalid topic 
is discovered in metadata. This can be easily reproduced by calling 
subscribe("invalid topic") and then poll, for example.The new consumer does not 
throw the expected InvalidTopicException like the LegacyKafkaConsumer does. 

The legacy consumer achieves this by checking for metadata exceptions on every 
iteration of the ConsumerNetworkClient (see 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])


> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
> URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-14 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16764:
--

 Summary: New consumer should throw InvalidTopicException on poll 
when invalid topic in metadata
 Key: KAFKA-16764
 URL: https://issues.apache.org/jira/browse/KAFKA-16764
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans


A call to consumer.poll should throw InvalidTopicException if an invalid topic 
is discovered in metadata. This can be easily reproduced by calling 
subscribe("invalid topic") and then poll, for example.The new consumer does not 
throw the expected InvalidTopicException like the LegacyKafkaConsumer does. 

The legacy consumer achieves this by checking for metadata exceptions on every 
iteration of the ConsumerNetworkClient (see 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-14 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1600452206


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -116,6 +125,71 @@ public void start(Map props) {
 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
 }
 
+// read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+class CheckpointRecordHandler {
+private volatile KafkaException lastLoggedErrorReadingCheckpoints 
= null;
+
+void handle(Throwable error, ConsumerRecord 
cpRecord) {
+// See KafkaBasedLog.poll : only KafkaException can be passed 
as error
+if (error instanceof KafkaException) {
+// only log once
+if (lastLoggedErrorReadingCheckpoints == null || 
!lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) {
+log.error("Error loading Checkpoint topic", error);
+lastLoggedErrorReadingCheckpoints = (KafkaException) 
error;
+}
+
+if (error instanceof RetriableException) {
+return;
+} else {
+throw (KafkaException) error;
+}
+} else { // error is null
+lastLoggedErrorReadingCheckpoints = null;
+Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);

Review Comment:
   deserialization can fail due to bad data in the topic



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -116,6 +125,71 @@ public void start(Map props) {
 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
 }
 
+// read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+class CheckpointRecordHandler {
+private volatile KafkaException lastLoggedErrorReadingCheckpoints 
= null;
+
+void handle(Throwable error, ConsumerRecord 
cpRecord) {
+// See KafkaBasedLog.poll : only KafkaException can be passed 
as error
+if (error instanceof KafkaException) {
+// only log once
+if (lastLoggedErrorReadingCheckpoints == null || 
!lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) {
+log.error("Error loading Checkpoint topic", error);
+lastLoggedErrorReadingCheckpoints = (KafkaException) 
error;
+}
+
+if (error instanceof RetriableException) {
+return;
+} else {
+throw (KafkaException) error;
+}
+} else { // error is null
+lastLoggedErrorReadingCheckpoints = null;
+Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);
+if (consumerGroups.contains(cp.consumerGroupId())) {
+Map cps = 
checkpointsPerConsumerGroup.computeIfAbsent(cp.consumerGroupId(), ignored -> 
new HashMap<>());
+cps.put(cp.topicPartition(), cp);
+}
+}
+}
+}
+
+CheckpointRecordHandler handler = new CheckpointRecordHandler();
+TopicAdmin cpAdmin = null;
+KafkaBasedLog previousCheckpoints = null;
+
+try {
+cpAdmin = new TopicAdmin(
+config.targetAdminConfig("checkpoint-target-admin"),
+
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")));
+
+previousCheckpoints = KafkaBasedLog.withExistingClients(
+config.checkpointsTopic(),
+
MirrorUtils.newConsumer(config.targetConsumerConfig(CHECKPOINTS_TARGET_CONSUMER_ROLE)),
+null,
+cpAdmin,
+(error, cpRecord) -> handler.handle(error, cpRecord),
+Time.SYSTEM,
+ignored -> { },
+topicPartition -> topicPartition.partition() == 0);
+
+log.info("Starting loading Checkpoint topic : {}", 
config.checkpointsTopic());
+previousCheckpoints.start(true);
+previousCheckpoints.stop();
+log.info("Finished loading Checkpoint topic : {}", 
config.checkpointsTopic());
+log.debug("Initial checkpointsPerConsumerGroup : {}", 
checkpointsPerConsumerGroup);
+return true;
+}  catch (KafkaException kexc) 

Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-14 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1600493799


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync(
 return EMPTY_RESULT;
 }
 
+/**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+private CoordinatorResult classicGroupSyncToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+
+ConsumerGroupMember member;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(request.memberId(), false);
+} else {
+member = group.staticMember(instanceId);
+if (member == null) {
+throw new UnknownMemberIdException(
+String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+);
+}
+throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+}
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdOrProtocolUnmatched(
+group,
+member,
+request.generationId(),
+request.protocolType(),
+request.protocolName()
+);
+
+cancelConsumerGroupSyncTimeout(groupId, memberId);
+//scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicMemberSessionTimeout());

Review Comment:
   I just merged it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add classic member session timeout to ClassicMemberMetadata [kafka]

2024-05-14 Thread via GitHub


dajac merged PR #15921:
URL: https://github.com/apache/kafka/pull/15921


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: revisit LogValidatorTest#checkRecompression [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on PR #15948:
URL: https://github.com/apache/kafka/pull/15948#issuecomment-2110831638

   @junrao thanks for all your reviews :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on PR #15838:
URL: https://github.com/apache/kafka/pull/15838#issuecomment-2110829824

   > The new test seems to be failing: [testReduceNumNetworkThreads() – 
kafka.server.KRaftClusterTest](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15838/1/tests)
 can you check on it?
   
   see https://github.com/apache/kafka/pull/15838#discussion_r1585911858


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16686: Wait for given offset in TopicBasedRemoteLogMetadataManagerTest [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on PR #15885:
URL: https://github.com/apache/kafka/pull/15885#issuecomment-2110828020

   >  looks like we still suffer from thread leaks in CI :( I've rebased from 
trunk to trigger CI again
   
   I have noticed that too. so sad :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: revisit LogValidatorTest#checkRecompression [kafka]

2024-05-14 Thread via GitHub


junrao commented on code in PR #15948:
URL: https://github.com/apache/kafka/pull/15948#discussion_r1600406699


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -497,9 +497,11 @@ class LogValidatorTest {
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
 
-// Both V2 and V1 has single branch in the record when compression is 
enable, and hence their shallow OffsetOfMaxTimestamp
-// is the last offset of the single branch
-assertEquals(1, records.batches().asScala.size)
+// V2 has single batch, and other versions has many single-record batches
+assertEquals(if (magic >= RecordBatch.MAGIC_VALUE_V2) 1 else 3, 
records.batches().asScala.size)
+// Both V2 and V1 has single batch in the record when compression is 
enable, and hence their shallow OffsetOfMaxTimestamp

Review Comment:
   in the record => in the validated records



##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -497,9 +497,11 @@ class LogValidatorTest {
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
 
-// Both V2 and V1 has single branch in the record when compression is 
enable, and hence their shallow OffsetOfMaxTimestamp
-// is the last offset of the single branch
-assertEquals(1, records.batches().asScala.size)
+// V2 has single batch, and other versions has many single-record batches
+assertEquals(if (magic >= RecordBatch.MAGIC_VALUE_V2) 1 else 3, 
records.batches().asScala.size)

Review Comment:
   Should we move this assert to immediate after records is created?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] improve log description of QuorumController [kafka]

2024-05-14 Thread via GitHub


mumrah commented on code in PR #15926:
URL: https://github.com/apache/kafka/pull/15926#discussion_r1600407308


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -165,6 +165,8 @@ static ControllerResult recordsForNonEmptyLog(
 throw new RuntimeException("Should not have ZK 
migrations enabled on a cluster that was " +
 "created in KRaft mode.");
 }
+logMessageBuilder
+.append("since this is a de-novo KRaft cluster.");

Review Comment:
   How about "This is expected because this is a de-novo KRaft cluster." ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16759: Handle telemetry push response while terminating [kafka]

2024-05-14 Thread via GitHub


AndrewJSchofield opened a new pull request, #15957:
URL: https://github.com/apache/kafka/pull/15957

   When client telemetry is configured in a cluster, Kafka producers and 
consumers push metrics to the brokers periodically. There is a special push of 
metrics that occurs when the client is terminating. A state machine in the 
client telemetry reporter controls its behaviour in different states.
   
   Sometimes, when a client was terminating, it was attempting an invalid state 
transition from TERMINATING_PUSH_IN_PROGRESS to PUSH_NEEDED when it receives a 
response to a PushTelemetry RPC. This was essentially harmless because the 
state transition did not occur but it did cause unsightly log lines to be 
generated. This PR performs a check for the terminating states when receiving 
the response and simply remains in the current state.
   
   I added a test to validate the state management in this case. Actually, the 
test passes before the code change in the PR, but with unsightly log lines.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]

2024-05-14 Thread via GitHub


mumrah commented on PR #15838:
URL: https://github.com/apache/kafka/pull/15838#issuecomment-2110742012

   The new test seems to be failing: [`testReduceNumNetworkThreads() – 
kafka.server.KRaftClusterTest`](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15838/1/tests)
 can you check on it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-05-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-14567:
--
Fix Version/s: 3.8.0

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Blocker
>  Labels: eos
> Fix For: 3.8.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.

[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-05-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-14567:
--
Affects Version/s: 3.7.0
   (was: 3.8.0)

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Blocker
>  Labels: eos
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.strea

[jira] [Assigned] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-05-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-14567:
-

Assignee: Kirk True

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Major
>  Labels: eos
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
>  

[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-05-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-14567:
--
Priority: Blocker  (was: Major)

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Blocker
>  Labels: eos
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecut

[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-05-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-14567:
--
Affects Version/s: 3.8.0

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Major
>  Labels: eos
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.proces

Re: [PR] KAFKA-16686: Wait for given offset in TopicBasedRemoteLogMetadataManagerTest [kafka]

2024-05-14 Thread via GitHub


gaurav-narula commented on PR #15885:
URL: https://github.com/apache/kafka/pull/15885#issuecomment-2110720158

   @chia7712 looks like we still suffer from thread leaks in CI :( I've rebased 
from trunk to trigger CI again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]

2024-05-14 Thread via GitHub


gaurav-narula commented on code in PR #15945:
URL: https://github.com/apache/kafka/pull/15945#discussion_r1600362319


##
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Tracks the registration of a specific broker, and executes a callback if it 
should be refreshed.
+ *
+ * This tracker handles cases where we might want to re-register the broker. 
The only such case
+ * right now is during the transition from non-JBOD mode, to JBOD mode. In 
other words, the
+ * transition from a MetadataVersion less than 3.7-IV2, to one greater than or 
equal to 3.7-IV2.
+ * In this case, the broker registration will start out containing no 
directories, and we need to
+ * resend the BrokerRegistrationRequest to fix that.
+ *
+ * As much as possible, the goal here is to keep things simple. We just 
compare the desired state
+ * with the actual state, and try to make changes only if necessary.
+ */
+public class BrokerRegistrationTracker implements MetadataPublisher {
+private final Logger log;
+private final int id;
+private final Runnable refreshRegistrationCallback;
+
+/**
+ * Create the tracker.
+ *
+ * @param idThe ID of this broker.
+ * @param targetDirectories The directories managed by this 
broker.
+ * @param refreshRegistrationCallback   Callback to run if we need to 
refresh the registration.
+ */
+public BrokerRegistrationTracker(
+int id,
+List targetDirectories,
+Runnable refreshRegistrationCallback
+) {
+this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] 
").
+logger(BrokerRegistrationTracker.class);
+this.id = id;
+this.refreshRegistrationCallback = refreshRegistrationCallback;
+}
+
+@Override
+public String name() {
+return "BrokerRegistrationTracker(id=" + id + ")";
+}
+
+@Override
+public void onMetadataUpdate(
+MetadataDelta delta,
+MetadataImage newImage,
+LoaderManifest manifest
+) {
+boolean checkBrokerRegistration = false;
+if (delta.featuresDelta() != null) {
+if (delta.metadataVersionChanged().isPresent()) {

Review Comment:
   Is this only for the logging? Seems like we unconditionally pass 
`newImage.features().metadataVersion()`  in `brokerRegistrationNeedsRefresh` 
down below and it only gates on `registration == null` 🤔 
   
   I tried commenting lines 78-86 out and the tests still pass which makes me 
wonder if `checkBrokerRegistration` can be simplified or if there's a test 
we're missing.



##
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache

[PR] KAFKA-15045: (KIP-924 pt. 4) Generify rack graph solving utilities [kafka]

2024-05-14 Thread via GitHub


apourchet opened a new pull request, #15956:
URL: https://github.com/apache/kafka/pull/15956

   The graph solving utilities are currently hardcoded to work with 
ClientState, but don't actually depend on anything in those state classes.
   
   This change allows the MinTrafficGraphConstructor and 
BalanceSubtopologyGraphConstructor to be reused with KafkaStreamsStates instead.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations

2024-05-14 Thread Luke D (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846377#comment-17846377
 ] 

Luke D commented on KAFKA-16361:


[~ableegoldman] , the bug is not present in 3.4 from our local reproduction 
test case. The API available to introduce the conditions as we are testing them 
are not present in 3.3 and below. So from our specific presentation of the bug 
it appears to be a regression introduced in 3.5. 

> Rack aware sticky assignor minQuota violations
> --
>
> Key: KAFKA-16361
> URL: https://issues.apache.org/jira/browse/KAFKA-16361
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.1, 3.7.0, 3.6.1
>Reporter: Luke D
>Priority: Major
> Attachments: illegalstateexception.log
>
>
> In some low topic replication scenarios the rack aware assignment in the 
> StickyAssignor fails to balance consumers to its own expectations and throws 
> an IllegalStateException, commonly crashing the application (depending on 
> application implementation). While uncommon the error is deterministic, and 
> so persists until the replication state changes. 
>  
> We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it 
> locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely 
> would also be reproducible there) 
>  
> Here is the error and stack from our test case against 3.7.0
> {code:java}
> We haven't reached the expected number of members with more than the minQuota 
> partitions, but no more partitions to be assigned
> java.lang.IllegalStateException: We haven't reached the expected number of 
> members with more than the minQuota partitions, but no more partitions to be 
> assigned
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
>  {code}
> Here is a specific test case from 3.7.0 that fails when passed to 
> StickyAssignor.assign:
> {code:java}
> Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 
> (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: 
> rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader 
> = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = 
> 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = 
> 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = 
> 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = 
> 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 8, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 33, leader = 
> 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = 
> topic_name, 

[PR] MINOR: Fix warnings in streams javadoc [kafka]

2024-05-14 Thread via GitHub


mimaison opened a new pull request, #15955:
URL: https://github.com/apache/kafka/pull/15955

   Fixes the following warnings (`./gradlew streams:javadoc`):
   
   ```
   > Task :streams:javadoc
   
/Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java:74:
 warning - @param argument "assignment:" is not a parameter name.
   
/Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java:74:
 warning - @param argument "subscription:" is not a parameter name.
   
/Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java:74:
 warning - @param argument "error:" is not a parameter name.
   
/Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:663:
 warning - Missing closing '}' character for inline tag: "{@code 
auto.include.jmx.reporter"
   
/Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:663:
 warning - Tag @link: reference not found: JMX_REPORTER "jmx.reporter"
   
/Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:579:
 warning - Missing closing '}' character for inline tag: "{@code 
default.windowed.key.serde.inner"
   
/Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:587:
 warning - Missing closing '}' character for inline tag: "{@code 
default.windowed.value.serde.inner"
   
/Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:663:
 warning - Tag @link: reference not found: JMX_REPORTER "jmx.reporter"
   
/Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:663:
 warning - Tag @link: reference not found: JMX_REPORTER "jmx.reporter"
   
/Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:663:
 warning - Tag @link: reference not found: JMX_REPORTER "jmx.reporter"
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14

2024-05-14 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16763:
--

 Summary: Upgrade to scala 2.12.19 and scala 2.13.14
 Key: KAFKA-16763
 URL: https://issues.apache.org/jira/browse/KAFKA-16763
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19)

 

scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-14 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1600314641


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync(
 return EMPTY_RESULT;
 }
 
+/**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+private CoordinatorResult classicGroupSyncToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+
+ConsumerGroupMember member;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(request.memberId(), false);
+} else {
+member = group.staticMember(instanceId);
+if (member == null) {
+throw new UnknownMemberIdException(
+String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+);
+}
+throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+}
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdOrProtocolUnmatched(
+group,
+member,
+request.generationId(),
+request.protocolType(),
+request.protocolName()
+);
+

Review Comment:
   I wanted to make it return REBALANCE_IN_PROGRESS but we seems to have no way 
to know whether a new rebalance is triggered in sync group...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-14 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1600311785


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync(
 return EMPTY_RESULT;
 }
 
+/**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+private CoordinatorResult classicGroupSyncToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+
+ConsumerGroupMember member;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(request.memberId(), false);
+} else {
+member = group.staticMember(instanceId);
+if (member == null) {
+throw new UnknownMemberIdException(
+String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+);
+}
+throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+}
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdOrProtocolUnmatched(
+group,
+member,
+request.generationId(),
+request.protocolType(),
+request.protocolName()
+);
+
+cancelConsumerGroupSyncTimeout(groupId, memberId);
+//scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicMemberSessionTimeout());

Review Comment:
   Needs https://github.com/apache/kafka/pull/15921



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-14 Thread via GitHub


dongnuo123 opened a new pull request, #15954:
URL: https://github.com/apache/kafka/pull/15954

   This pr implements the sync group api for the consumer groups that are in 
the mixed mode. 
   
   In `classicGroupSyncToConsumerGroup`, the `assignedPartitions` calculated in 
the JoinGroup will be returned as the assignment in the sync response and the 
member session timeout will be rescheduled.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16699: Have Streams treat InvalidPidMappingException like a ProducerFencedException [kafka]

2024-05-14 Thread via GitHub


wcarlson5 merged PR #15919:
URL: https://github.com/apache/kafka/pull/15919


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16762) SyncGroup API for upgrading ConsumerGroup

2024-05-14 Thread Dongnuo Lyu (Jira)
Dongnuo Lyu created KAFKA-16762:
---

 Summary: SyncGroup API for upgrading ConsumerGroup
 Key: KAFKA-16762
 URL: https://issues.apache.org/jira/browse/KAFKA-16762
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dongnuo Lyu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16762) SyncGroup API for upgrading ConsumerGroup

2024-05-14 Thread Dongnuo Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongnuo Lyu reassigned KAFKA-16762:
---

Assignee: Dongnuo Lyu

> SyncGroup API for upgrading ConsumerGroup
> -
>
> Key: KAFKA-16762
> URL: https://issues.apache.org/jira/browse/KAFKA-16762
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16699: Have Streams treat InvalidPidMappingException like a ProducerFencedException [kafka]

2024-05-14 Thread via GitHub


wcarlson5 commented on PR #15919:
URL: https://github.com/apache/kafka/pull/15919#issuecomment-2110611187

   There were no new or related test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1600301660


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -314,6 +314,77 @@ class ReplicaManagerTest {
 }
   }
 
+  @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with 
futureLogCreated: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): 
Unit = {
+val dir1 = TestUtils.tempDir()
+val dir2 = TestUtils.tempDir()
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+val config = KafkaConfig.fromProps(props)
+val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+val spyLogManager = spy(logManager)
+val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+val tp0 = new TopicPartition(topic, 0)
+val uuid = Uuid.randomUuid()
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = spyLogManager,
+  quotaManagers = quotaManager,
+  metadataCache = metadataCache,
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+try {
+  val partition = rm.createPartition(tp0)
+  partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+  rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(new LeaderAndIsrPartitionState()
+  .setTopicName(topic)
+  .setPartitionIndex(0)
+  .setControllerEpoch(0)
+  .setLeader(0)
+  .setLeaderEpoch(0)
+  .setIsr(Seq[Integer](0).asJava)
+  .setPartitionEpoch(0)
+  .setReplicas(Seq[Integer](0).asJava)
+  .setIsNew(false)).asJava,
+Collections.singletonMap(topic, Uuid.randomUuid()),

Review Comment:
   Is it possible that `LeaderAndIsrRequest` carries a different topic id ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16539) Can't update specific broker configs in pre-migration mode

2024-05-14 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846365#comment-17846365
 ] 

David Arthur commented on KAFKA-16539:
--

[~chia7712] here's a cherry-pick to 3.7 
https://github.com/apache/kafka/pull/15953

> Can't update specific broker configs in pre-migration mode
> --
>
> Key: KAFKA-16539
> URL: https://issues.apache.org/jira/browse/KAFKA-16539
> Project: Kafka
>  Issue Type: Bug
>  Components: config, kraft
>Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.8.0, 3.7.1, 3.6.3
>
>
> In migration mode, ZK brokers will have a forwarding manager configured. This 
> is used to forward requests to the KRaft controller once we get to that part 
> of the migration. However, prior to KRaft taking over as the controller 
> (known as pre-migration mode), the ZK brokers are still attempting to forward 
> IncrementalAlterConfigs to the controller.
> This works fine for cluster level configs (e.g., "-entity-type broker 
> --entity-default"), but this fails for specific broker configs (e.g., 
> "-entity-type broker --entity-id 1").
> This affects BROKER and BROKER_LOGGER config types.
> To workaround this bug, you can either disable migrations on the brokers 
> (assuming no migration has taken place), or proceed with the migration and 
> get to the point where KRaft is the controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Cherry-pick KAFKA-16539 to 3.7 [kafka]

2024-05-14 Thread via GitHub


mumrah opened a new pull request, #15953:
URL: https://github.com/apache/kafka/pull/15953

   This patch fixes two issues with IncrementalAlterConfigs and the ZK 
migration. First, it changes the handling of IncrementalAlterConfigs to check 
if the controller is ZK vs KRaft and only forward for KRaft. Second, it adds a 
check in KafkaZkClient#setOrCreateEntityConfigs to ensure a ZK broker is not 
directly modifying configs in ZK if there is a KRaft controller. This closes 
the race condition between KRaft taking over as the active controller and the 
ZK brokers learning about this.
   
   *Forwarding*
   
   During the ZK migration, there is a time when the ZK brokers are running 
with migrations enabled, but KRaft has yet to take over as the controller. 
Prior to KRaft taking over as the controller, the ZK brokers in migration mode 
were unconditionally forwarding IncrementalAlterConfigs (IAC) to the ZK 
controller. This works for some config types, but breaks when setting BROKER 
and BROKER_LOGGER configs for a specific broker. The behavior in KafkaApis for 
IAC was to always forward if the forwarding manager was defined. Since ZK 
brokers in migration mode have forwarding enabled, the forwarding would happen, 
and the special logic for BROKER and BROKER_LOGGER would be missed, causing the 
request to fail.
   
   With this fix, the IAC handler will check if the controller is KRaft or ZK 
and only forward for KRaft.
   
   *Protected ZK Writes*
   
   As part of KIP-500, we moved most (but not all) ZK mutations to the ZK 
controller. One of the things we did not move fully to the controller was 
entity configs. This is because there was some special logic that needed to run 
on the broker for certain config updates. If a broker-specific config was set, 
AdminClient would route the request to the proper broker. In KRaft, we have a 
different mechanism for handling broker-specific config updates.
   
   Leaving this ZK update on the broker side would be okay if we were guarding 
writes on the controller epoch, but it turns out 
KafkaZkClient#setOrCreateEntityConfigs does unprotected "last writer wins" 
updates to ZK. This means a ZK broker could update the contents of ZK after the 
metadata had been migrated to KRaft. No good! To fix this, this patch adds a 
check on the controller epoch to KafkaZkClient#setOrCreateEntityConfigs but 
also adds logic to fail the update if the controller is a KRaft controller.
   
   The new logic in setOrCreateEntityConfigs adds STALE_CONTROLLER_EPOCH as a 
new exception that can be thrown while updating configs.
   
   Reviewers:  Luke Chen , Akhilesh Chaganti 
, Chia-Ping Tsai 
   
   Conflicts: Minor conflicts due to the config refactoring in trunk
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16406) Split long-running consumer integration test

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16406:
---
Labels: consumer kip-848-client-support  (was: )

> Split long-running consumer integration test
> 
>
> Key: KAFKA-16406
> URL: https://issues.apache.org/jira/browse/KAFKA-16406
> Project: Kafka
>  Issue Type: Task
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer, kip-848-client-support
> Fix For: 3.8.0
>
>
> PlaintextConsumerTest contains integration tests for the consumer. Since the 
> introduction of the new consumer group protocol (KIP-848) and the new 
> KafkaConsumer, this test has been parametrized to run with multiple 
> combinations, making sure we test the logic for the old and new coordinator, 
> as well as for the legacy and new KafkaConsumer. 
> This led to this being one of the longest-running integration tests, so in 
> the aim of reducing the impact on the build times we could split it to allow 
> for parallelization.  The tests covers multiple areas of the consumer logic, 
> in a single file, so splitting based on the high-level features being tested 
> would be sensible and achieve the result wanted.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16406) Split long-running consumer integration test

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16406.
--

> Split long-running consumer integration test
> 
>
> Key: KAFKA-16406
> URL: https://issues.apache.org/jira/browse/KAFKA-16406
> Project: Kafka
>  Issue Type: Task
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
> Fix For: 3.8.0
>
>
> PlaintextConsumerTest contains integration tests for the consumer. Since the 
> introduction of the new consumer group protocol (KIP-848) and the new 
> KafkaConsumer, this test has been parametrized to run with multiple 
> combinations, making sure we test the logic for the old and new coordinator, 
> as well as for the legacy and new KafkaConsumer. 
> This led to this being one of the longest-running integration tests, so in 
> the aim of reducing the impact on the build times we could split it to allow 
> for parallelization.  The tests covers multiple areas of the consumer logic, 
> in a single file, so splitting based on the high-level features being tested 
> would be sensible and achieve the result wanted.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16406) Split long-running consumer integration test

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-16406.

Resolution: Fixed

> Split long-running consumer integration test
> 
>
> Key: KAFKA-16406
> URL: https://issues.apache.org/jira/browse/KAFKA-16406
> Project: Kafka
>  Issue Type: Task
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
> Fix For: 3.8.0
>
>
> PlaintextConsumerTest contains integration tests for the consumer. Since the 
> introduction of the new consumer group protocol (KIP-848) and the new 
> KafkaConsumer, this test has been parametrized to run with multiple 
> combinations, making sure we test the logic for the old and new coordinator, 
> as well as for the legacy and new KafkaConsumer. 
> This led to this being one of the longest-running integration tests, so in 
> the aim of reducing the impact on the build times we could split it to allow 
> for parallelization.  The tests covers multiple areas of the consumer logic, 
> in a single file, so splitting based on the high-level features being tested 
> would be sensible and achieve the result wanted.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16700) Kafka Streams: possible message loss on KTable-KTable FK Left Join

2024-05-14 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846363#comment-17846363
 ] 

Ayoub Omari commented on KAFKA-16700:
-

[~stoeckmk] About KIP-962, it's not about null foreign keys, it's only about 
null keys of the left topic. So null foreign keys are still behaving the same 
way as before the KIP.

 

> Kafka Streams: possible message loss on KTable-KTable FK Left Join
> --
>
> Key: KAFKA-16700
> URL: https://issues.apache.org/jira/browse/KAFKA-16700
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
> Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 
> 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka 
> Operators
>Reporter: Karsten Stöckmann
>Priority: Major
>  Labels: dsl, joins, streams
>
> We are experiencing significant, yet intermittent / non-deterministic / 
> unexplainable message loss on a Kafka Streams topology while performing a 
> *KTable-KTable* {*}FK Left Join{*}.
> Assume the following snippet:
> {code:java}
> streamsBuilder
> .table(
> folderTopicName,
> Consumed.with(
> folderKeySerde,
> folderSerde))
> .leftJoin(
> agencies, // KTable
> Folder::agencyIdValue,
> AggregateFolder::new,
> TableJoined.as("folder-to-agency"),
> Materialized
> .as("folder-to-agency-materialized")
> .withKeySerde(folderKeySerde)
> .withValueSerde(aggregateFolderSerde))
> .leftJoin(
> documents,
> {code}
> The setup is as follows:
> A Debezium Connector for PostgreSQL streams database changes into various 
> Kafka topics. A series of Quarkus Kafka Streams applications then performs 
> aggregation operations on those topics to create index documents later to be 
> sent into an OpenSearch system.
> When firing up the Kafka Streams infrastructure to work on initially 
> populated Kafka Topics (i.e. a snapshot of all relevant table data has been 
> streamed to Kafka), the above shown KTable-KTable FK Left Join seems to 
> produce message loss on the first of a series of FK Left Joins; the right 
> hand {{KTable}} is consumed from an aggregated 
> topic fed from another Kafka Streams topology / application.
> On a (heavily reduced) test data set of 6828 messages in the 
> {{folderTopicName}} Topic, we observe the following results:
>  * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages
>  * {{{}folder-to-agency-subscription-response{}}}: *3048* messages
>  * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages
>  * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages.
> Telling from the nature of a (FK) Left Join, I'd expect all messages from the 
> left hand topic should produce an aggregate even if no matching message is 
> found in the right hand topic.
> Message loss unpredictably varies across tests and seems not to be bound to 
> specific keys or messages.
> As it seems, this can only be observed when initially firing up the Streams 
> infrastructure to process the message 'backlog' that had been snapshotted by 
> Debezium. A manual snapshot triggered later (i.e. Streams applications 
> already running) seems not to show this behaviour. Additionally, as of yet we 
> observed this kind of message loss only when running multiple replicas of the 
> affected application. When carrying out the tests with only one replica, 
> everything seems to work as expected. We've tried to leverage 
> {{group.initial.rebalance.delay.ms}} in order to rule out possible 
> rebalancing issues, but to no avail.
> Our Kafka configuration:
> {code:yaml}
> offsets.topic.replication.factor: 3
> transaction.state.log.replication.factor: 3
> transaction.state.log.min.isr: 2
> default.replication.factor: 3
> min.insync.replicas: 2
> message.max.bytes: "20971520"
> {code}
> Our Kafka Streams application configuration:
> {code:yaml}
> kafka-streams.num.stream.threads: 5
> kafka-streams.num.standby.replicas: 1
> kafka-streams.auto.offset.reset: earliest
> kafka-streams.cache.max.bytes.buffering: "20971520"
> kafka-streams.commit.interval.ms: 100
> kafka-streams.fetch.max.bytes: "10485760"
> kafka-streams.max.request.size: "10485760"
> kafka-streams.max.partition.fetch.bytes: "10485760"
> kafka-streams.metadata.max.age.ms: 30
> kafka-streams.statestore.cache.max.bytes: "20971520"
> kafka-streams.topology.optimization: all
> kafka-streams.processing.guarantee: exactly_once_v2
> # Kafka Streams Intermediate Topics
> kafka-streams.topic.compression.type: lz4
> kafk

Re: [PR] KAFKA-16671: enable test for ensureInternalEndpointIsSecured [kafka]

2024-05-14 Thread via GitHub


FrankYang0529 commented on code in PR #15868:
URL: https://github.com/apache/kafka/pull/15868#discussion_r1600286651


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java:
##
@@ -108,8 +110,8 @@ public void ensureInternalEndpointIsSecured() throws 
Throwable {
 connectorTasksEndpoint
 );
 assertEquals(
-BAD_REQUEST.getStatusCode(),
-connect.requestPost(connectorTasksEndpoint, "[]", 
emptyHeaders).getStatus()
+BAD_REQUEST.getStatusCode(),

Review Comment:
   Revert it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16759) Invalid client telemetry transition on consumer close

2024-05-14 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846360#comment-17846360
 ] 

Andrew Schofield commented on KAFKA-16759:
--

Analysing the transitions, the invalid transition is real.
{noformat}
+++ SUBSCRIPTION_NEEDED -->> SUBSCRIPTION_IN_PROGRESS
+++ SUBSCRIPTION_IN_PROGRESS -->> PUSH_NEEDED
^C+++ PUSH_NEEDED -->> TERMINATING_PUSH_NEEDED
+++ TERMINATING_PUSH_NEEDED -->> TERMINATING_PUSH_IN_PROGRESS
+++ TERMINATING_PUSH_IN_PROGRESS -->> PUSH_NEEDED
[2024-05-14 16:48:05,043] WARN Error updating client telemetry state, disabled 
telemetry (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
java.lang.IllegalStateException: Invalid telemetry state transition from 
TERMINATING_PUSH_IN_PROGRESS to PUSH_NEEDED; the valid telemetry state 
transitions from TERMINATING_PUSH_IN_PROGRESS are: TERMINATED
at 
org.apache.kafka.common.telemetry.ClientTelemetryState.validateTransition(ClientTelemetryState.java:163)
at 
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.maybeSetState(ClientTelemetryReporter.java:828)
at 
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.handleResponse(ClientTelemetryReporter.java:520)
at 
org.apache.kafka.clients.NetworkClient$TelemetrySender.handleResponse(NetworkClient.java:1321)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:948)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:594)
at 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.poll(NetworkClientDelegate.java:130)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:140)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)
[2024-05-14 16:48:05,044] WARN Unable to transition state after successful push 
telemetry from state TERMINATING_PUSH_IN_PROGRESS 
(org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
+++ TERMINATING_PUSH_IN_PROGRESS -->> TERMINATED{noformat}

> Invalid client telemetry transition on consumer close
> -
>
> Key: KAFKA-16759
> URL: https://issues.apache.org/jira/browse/KAFKA-16759
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Minor
> Fix For: 3.8.0
>
>
> Using the console consumer with client telemetry enabled, I hit an invalid 
> state transition when closing the consumer with CTRL-C. The consumer sends a 
> final "terminating" telemetry push which puts the client telemetry reporter 
> into TERMINATING_PUSH_IN_PROGRESS state. When it receives a response in this 
> state, it attempts an invalid state transition.
>  
> {noformat}
> [2024-05-13 19:19:35,804] WARN Error updating client telemetry state, 
> disabled telemetry 
> (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
> java.lang.IllegalStateException: Invalid telemetry state transition from 
> TERMINATING_PUSH_IN_PROGRESS to PUSH_NEEDED; the valid telemetry state 
> transitions from TERMINATING_PUSH_IN_PROGRESS are: TERMINATED
>   at 
> org.apache.kafka.common.telemetry.ClientTelemetryState.validateTransition(ClientTelemetryState.java:163)
>   at 
> org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.maybeSetState(ClientTelemetryReporter.java:827)
>   at 
> org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.handleResponse(ClientTelemetryReporter.java:520)
>   at 
> org.apache.kafka.clients.NetworkClient$TelemetrySender.handleResponse(NetworkClient.java:1321)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:948)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:594)
>   at 
> org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.poll(NetworkClientDelegate.java:130)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.sendUnsentRequests(ConsumerNetworkThread.java:262)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.cleanup(ConsumerNetworkThread.java:275)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:95)
> [2024-05-13 19:19:35,805] WARN Unable to transition state after successful 
> push telemetry from state TERMINATING_PUSH_IN_PROGRESS 
> (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-14 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1600269458


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,7 +42,7 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {

Review Comment:
   Just to throw my own explanation in here:
   VLeft and VRight are "absolute" in that they are the left and right types of 
the overall join. Both sides of the join have equivalent VLeft and VRight 
types, because they share a common outerJoinStore instance.
   
   VThis and VOther are "relative" in that they are the type of records 
entering "this" side of the join, and the "other" side of the join, and this is 
necessarily swapped for the other side of the join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-14 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1600267022


##
clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java:
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.compress;
+
+import com.github.luben.zstd.BufferPool;
+import com.github.luben.zstd.RecyclingBufferPool;
+import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
+import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ChunkedBytesStream;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class ZstdCompression implements Compression {
+
+public static final int MIN_LEVEL = Zstd.minCompressionLevel();
+public static final int MAX_LEVEL = Zstd.maxCompressionLevel();
+public static final int DEFAULT_LEVEL = Zstd.defaultCompressionLevel();
+
+private final int level;
+
+private ZstdCompression(int level) {
+this.level = level;
+}
+
+@Override
+public CompressionType type() {
+return CompressionType.ZSTD;
+}
+
+@Override
+public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, 
byte messageVersion) {
+try {
+// Set input buffer (uncompressed) to 16 KB (none by default) to 
ensure reasonable performance
+// in cases where the caller passes a small number of bytes to 
write (potentially a single byte).
+return new BufferedOutputStream(new 
ZstdOutputStreamNoFinalizer(bufferStream, RecyclingBufferPool.INSTANCE, level), 
16 * 1024);
+} catch (Throwable e) {
+throw new KafkaException(e);
+}
+}
+
+@Override
+public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
+try {
+return new ChunkedBytesStream(wrapForZstdInput(buffer, 
decompressionBufferSupplier),
+decompressionBufferSupplier,
+decompressionOutputSize(),
+false);
+} catch (Throwable e) {
+throw new KafkaException(e);
+}
+}
+
+// visible for testing
+public ZstdInputStreamNoFinalizer wrapForZstdInput(ByteBuffer buffer, 
BufferSupplier decompressionBufferSupplier) throws IOException {

Review Comment:
   I considered inlining this method as it's only used in a test which kind of 
test in `DefaultRecordBatchTest` the internal behavior of 
`ZstdInputStreamNoFinalizer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol

2024-05-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15284:
--
Fix Version/s: 4.0.0

> Implement ConsumerGroupProtocolVersionResolver to determine consumer group 
> protocol
> ---
>
> Key: KAFKA-15284
> URL: https://issues.apache.org/jira/browse/KAFKA-15284
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> At client initialization, we need to determine which of the 
> {{ConsumerDelegate}} implementations to use:
>  # {{LegacyKafkaConsumerDelegate}}
>  # {{AsyncKafkaConsumerDelegate}}
> There are conditions defined by KIP-848 that determine client eligibility to 
> use the new protocol. This will be modeled by the—deep 
> breath—{{{}ConsumerGroupProtocolVersionResolver{}}}.
> Known tasks:
>  * Determine at what point in the {{Consumer}} initialization the network 
> communication should happen
>  * Determine what RPCs to invoke in order to determine eligibility (API 
> versions, IBP version, etc.)
>  * Implement the network client lifecycle (startup, communication, shutdown, 
> etc.)
>  * Determine the fallback path in case the client is not eligible to use the 
> protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15697) Add local assignor and ensure it cannot be used with server side assignor

2024-05-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15697:
--
Fix Version/s: 4.0.0

> Add local assignor and ensure it cannot be used with server side assignor
> -
>
> Key: KAFKA-15697
> URL: https://issues.apache.org/jira/browse/KAFKA-15697
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> When we start supporting local/client-side assignor, we should:
>  # Add the config to ConsumerConfig
>  # Examine where should we implement to logic to ensure it is not used along 
> side with the server side assignor, i.e. you can only specify local or remote 
> assignor, or non.
>  ## If both assignors are specified: Throw illegalArgumentException



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15282) Implement client support for KIP-848 client-side assignors

2024-05-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15282:
--
Fix Version/s: 4.0.0

> Implement client support for KIP-848 client-side assignors
> --
>
> Key: KAFKA-15282
> URL: https://issues.apache.org/jira/browse/KAFKA-15282
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> The client-side assignor provides the logic for the partition assignments 
> instead of on the server. Client-side assignment is the main approach used by 
> the “old protocol” for divvying up partitions. While the “new protocol” 
> favors server-side assignors, the client-side assignor will continue to be 
> used for backward compatibility, including KSQL, Connect, etc.
> Note: I _*think*_ that the client-side assignor logic and the reconciliation 
> logic can remain separate from each other. We should strive to keep the two 
> pieces unencumbered, unless it’s unavoidable.
> This task includes:
>  * Validate the client’s configuration for assignor selection
>  * Integrate with the new {{PartitionAssignor}} interface to invoke the logic 
> from the user-provided assignor implementation
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupPrepareAssignment}} RPC call using the information from the 
> {{PartitionAssignor}} above
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupInstallAssignment}} RPC call, again using the information 
> calculated by the {{PartitionAssignor}}
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15279) Implement client support for KIP-848 client-side assigner RPCs

2024-05-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15279:
--
Fix Version/s: 4.0.0

> Implement client support for KIP-848 client-side assigner RPCs
> --
>
> Key: KAFKA-15279
> URL: https://issues.apache.org/jira/browse/KAFKA-15279
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> The protocol introduces three new RPCs that the client uses to communicate 
> with the broker:
>  # 
> [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI]
>  # 
> [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI]
>  # 
> [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI]
> Support for ConsumerGroupHeartbeat is handled by KAFKA-15278. This task is to 
> implement the ConsumerGroupAssignmentRequestManager to handle the second and 
> third RPCs on the above list.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-14 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1600263287


##
clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ChunkedBytesStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class Lz4Compression implements Compression {
+
+public static final int MIN_LEVEL = 1;
+public static final int MAX_LEVEL = 17;
+public static final int DEFAULT_LEVEL = 9;

Review Comment:
   I hesitated defining these constants for this reason but these levels have 
not changed over 10 years [0], so hopefully this won't require a lot of 
maintenance.
   
   0: 
https://github.com/lz4/lz4-java/blame/master/src/java/net/jpountz/lz4/LZ4Constants.java#L23-L24



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16406) Split long-running consumer integration test

2024-05-14 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846354#comment-17846354
 ] 

Kirk True commented on KAFKA-16406:
---

[~lianetm]—both PRs are closed. Can this be marked as resolved?

> Split long-running consumer integration test
> 
>
> Key: KAFKA-16406
> URL: https://issues.apache.org/jira/browse/KAFKA-16406
> Project: Kafka
>  Issue Type: Task
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
> Fix For: 3.8.0
>
>
> PlaintextConsumerTest contains integration tests for the consumer. Since the 
> introduction of the new consumer group protocol (KIP-848) and the new 
> KafkaConsumer, this test has been parametrized to run with multiple 
> combinations, making sure we test the logic for the old and new coordinator, 
> as well as for the legacy and new KafkaConsumer. 
> This led to this being one of the longest-running integration tests, so in 
> the aim of reducing the impact on the build times we could split it to allow 
> for parallelization.  The tests covers multiple areas of the consumer logic, 
> in a single file, so splitting based on the high-level features being tested 
> would be sensible and achieve the result wanted.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >