Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]

2024-05-06 Thread via GitHub


chia7712 commented on PR #15840:
URL: https://github.com/apache/kafka/pull/15840#issuecomment-2097507811

   @m1a2st please fix the build error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16666) Migrate `TransactionLogMessageFormatter`, GroupMetadataMessageFormatter` and `OffsetsMessageFormatter`to tools module

2024-05-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-1:
---
Description: 
`GroupMetadataMessageFormatter`[0], `OffsetsMessageFormatter`[1], and 
`TransactionLogMessageFormatter`[2] are used by ConsoleConsumer to parse data 
of internal topics. Following the migration plan, we should move them to 
tools-api module. Also, we need to keep the compatibility of command line. That 
is to say, `ConsoleConsumer` can accept the previous package name and then use 
the (java) implementation to parse and make same output.

[0] 
https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1269
[1] 
https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1248
[2] 
https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala#L145

  was:
`GroupMetadataMessageFormatter`[0], `OffsetsMessageFormatter`[1], and 
`TransactionLogMessageFormatter`[2] are used by ConsoleConsumer to parse data 
of internal topics. Following the migration plan, we should move them to tools 
module. Also, we need to keep the compatibility of command line. That is to 
say, `ConsoleConsumer` can accept the previous package name and then use the 
(java) implementation to parse and make same output.

[0] 
https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1269
[1] 
https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1248
[2] 
https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala#L145


> Migrate `TransactionLogMessageFormatter`, GroupMetadataMessageFormatter` and 
> `OffsetsMessageFormatter`to tools module
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: need-kip
>
> `GroupMetadataMessageFormatter`[0], `OffsetsMessageFormatter`[1], and 
> `TransactionLogMessageFormatter`[2] are used by ConsoleConsumer to parse data 
> of internal topics. Following the migration plan, we should move them to 
> tools-api module. Also, we need to keep the compatibility of command line. 
> That is to say, `ConsoleConsumer` can accept the previous package name and 
> then use the (java) implementation to parse and make same output.
> [0] 
> https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1269
> [1] 
> https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1248
> [2] 
> https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala#L145



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16677) Replace ClusterType#ALL and ClusterType#DEFAULT by Array

2024-05-06 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16677:
--

 Summary: Replace ClusterType#ALL and ClusterType#DEFAULT by Array
 Key: KAFKA-16677
 URL: https://issues.apache.org/jira/browse/KAFKA-16677
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Both ClusterType#ALL and ClusterType#DEFAULT are a kind of "tag" instead of 
true "type". It seems to me they can be removed by using Array. For example:

ClusterType#ALL -> {Type.ZK, Type.KRAFT, Type.CO_KRAFT}

ClusterType#DEFAULT -> {}

There are two benefits

1. That is more readable for "ALL type". 
2. We don't throw the awkward "exception" when seeing "DEFAULT".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]

2024-05-06 Thread via GitHub


showuon commented on code in PR #15837:
URL: https://github.com/apache/kafka/pull/15837#discussion_r1591749069


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2822,6 +2822,31 @@ class KafkaApisTest extends Logging {
   () => kafkaApis.handleWriteTxnMarkersRequest(null, 
RequestLocal.withThreadConfinedCaching))
   }
 
+  @Test
+  def requiredAclsNotPresentWriteTxnMarkersThrowsAuthorizationException(): 
Unit = {

Review Comment:
   For this test, it can pass without this change. Maybe we need a test to 
verify it won't throw exception when alter cluster is allowed, and 
clusterAction is denied, it won't throw exception. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14885: fix kafka client connect to the broker that offline from… [kafka]

2024-05-06 Thread via GitHub


Stephan14 commented on PR #13531:
URL: https://github.com/apache/kafka/pull/13531#issuecomment-2097337433

   hi @divijvaidya, how can know the failed test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]

2024-05-06 Thread via GitHub


chia7712 commented on code in PR #15840:
URL: https://github.com/apache/kafka/pull/15840#discussion_r1591747966


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -172,72 +138,210 @@ public void 
testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
 
 String brokerId = "1";
 adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
-alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
+alterOpts = asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
 
 // Add config
-alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "11"), Optional.of(brokerId));
-alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "12"), Optional.empty());
+alterAndVerifyConfig(zkClient, Optional.of(brokerId), 
singletonMap("message.max.size", "11"));
+alterAndVerifyConfig(zkClient, Optional.empty(), 
singletonMap("message.max.size", "12"));
 
 // Change config
-alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "13"), Optional.of(brokerId));
-alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "14"), Optional.empty());
+alterAndVerifyConfig(zkClient, Optional.of(brokerId), 
singletonMap("message.max.size", "13"));
+alterAndVerifyConfig(zkClient, Optional.empty(), 
singletonMap("message.max.size", "14"));
 
 // Delete config
-deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.of(brokerId));
-deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.empty());
+deleteAndVerifyConfig(zkClient, Optional.of(brokerId), 
singleton("message.max.size"));
+deleteAndVerifyConfig(zkClient, Optional.empty(), 
singleton("message.max.size"));
 
 // Listener configs: should work only with listener name
-alterAndVerifyConfig(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.of(brokerId));
+alterAndVerifyConfig(zkClient, Optional.of(brokerId), 
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"));
 assertThrows(ConfigException.class,
-() -> alterConfigWithZk(zkClient, 
Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), 
Optional.of(brokerId)));
+() -> alterConfigWithZk(zkClient, Optional.of(brokerId), 
singletonMap("ssl.keystore.location", "/tmp/test.jks")));
 
 // Per-broker config configured at default cluster-level should fail
 assertThrows(ConfigException.class,
-() -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.empty()));
-deleteAndVerifyConfig(zkClient, 
Collections.singleton("listener.name.external.ssl.keystore.location"), 
Optional.of(brokerId));
+() -> alterConfigWithZk(zkClient, Optional.empty(), 
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")));
+deleteAndVerifyConfig(zkClient, Optional.of(brokerId), 
singleton("listener.name.external.ssl.keystore.location"));
 
 // Password config update without encoder secret should fail
 assertThrows(IllegalArgumentException.class,
-() -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.password", 
"secret"), Optional.of(brokerId)));
+() -> alterConfigWithZk(zkClient, Optional.of(brokerId), 
singletonMap("listener.name.external.ssl.keystore.password", "secret")));
 
 // Password config update with encoder secret should succeed and 
encoded password must be stored in ZK
 Map configs = new HashMap<>();
 configs.put("listener.name.external.ssl.keystore.password", "secret");
 configs.put("log.cleaner.threads", "2");
-Map encoderConfigs = 
Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"encoder-secret");
-alterConfigWithZk(zkClient, configs, Optional.of(brokerId), 
encoderConfigs);
+Map encoderConfigs = 
singletonMap(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+alterConfigWithZk(zkClient, Optional.of(brokerId), configs, 
encoderConfigs);
 Properties brokerConfigs = zkClient.getEntityConfigs("brokers", 
brokerId);
-
assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG),
 "Encoder secret stored in ZooKeeper");
+assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), 
"Encoder secret stored in ZooKeeper");
 assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); 
// not encoded
 String encodedPassword = 

Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]

2024-05-06 Thread via GitHub


chia7712 commented on code in PR #15873:
URL: https://github.com/apache/kafka/pull/15873#discussion_r1591722419


##
core/src/test/java/kafka/admin/ConfigCommandUnitTest.java:
##
@@ -410,6 +448,430 @@ public void testOptionEntityTypeNames() {
 doTestOptionEntityTypeNames(false);
 }
 
+@Test
+public void shouldFailIfUnrecognisedEntityTypeUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "client", "--entity-type", "not-recognised", 
"--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
+}
+
+@Test
+public void shouldFailIfUnrecognisedEntityType() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "client", "--entity-type", "not-recognised", 
"--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
+}
+
+@Test
+public void shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "A", "--entity-type", "brokers", "--alter", 
"--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
+}
+
+@Test
+public void shouldFailIfBrokerEntityTypeIsNotAnInteger() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "A", "--entity-type", "brokers", "--alter", 
"--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
+}
+
+@Test
+public void 
shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--broker", "A", "--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
+}
+
+@Test
+public void shouldFailIfShortBrokerEntityTypeIsNotAnInteger() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--broker", "A", "--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
+}
+
+@Test
+public void shouldFailIfMixedEntityTypeFlagsUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "A", "--entity-type", "users", "--client", "B", 
"--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfMixedEntityTypeFlags() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "A", "--entity-type", "users", "--client", "B", 
"--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfInvalidHost() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "A,B", "--entity-type", "ips", "--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfInvalidHostUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "A,B", "--entity-type", "ips", "--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfUnresolvableHost() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "RFC2606.invalid", "--entity-type", "ips", 

Re: [PR] MINOR: Various cleanups in clients tests [kafka]

2024-05-06 Thread via GitHub


chia7712 commented on code in PR #15877:
URL: https://github.com/apache/kafka/pull/15877#discussion_r1591714910


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1323,7 +1323,7 @@ public void 
testConcurrentUpdateAndFetchForSnapshotAndCluster() throws Interrupt
 } else { // Thread to read metadata snapshot, once its updated
 try {
 if (!atleastMetadataUpdatedOnceLatch.await(5, 
TimeUnit.MINUTES)) {

Review Comment:
   How about using `assertDoesNotThrow`? For example:
   ```java
   assertTrue(assertDoesNotThrow(() -> 
atleastMetadataUpdatedOnceLatch.await(5, TimeUnit.MINUTES)),
   "Test had to wait more than 5 minutes, something 
went wrong.");
   ```



##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1335,7 +1335,7 @@ public void 
testConcurrentUpdateAndFetchForSnapshotAndCluster() throws Interrupt
 });
 }
 if (!allThreadsDoneLatch.await(5, TimeUnit.MINUTES)) {

Review Comment:
   How about using `assertTrue`?
   ```java
   assertTrue(allThreadsDoneLatch.await(5, TimeUnit.MINUTES), "Test had to wait 
more than 5 minutes, something went wrong.");
   ```



##
clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java:
##
@@ -220,9 +220,7 @@ public void 
shouldThrowNpeWhenAddingCollectionWithNullHeader() {
 
 private int getCount(Headers headers) {
 int count = 0;
-Iterator headerIterator = headers.iterator();
-while (headerIterator.hasNext()) {
-headerIterator.next();
+for (Header ignore : headers) {

Review Comment:
   How about using `toArray`? for example: `headers.toArray().length`



##
clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java:
##
@@ -78,34 +78,30 @@ public static void main(String[] args) throws Exception {
 final Time time = Time.SYSTEM;
 final AtomicBoolean done = new AtomicBoolean(false);
 final Object lock = new Object();

Review Comment:
   Maybe we should just delete this old class ...



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -106,10 +103,9 @@ public void setup() {
 commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
 offsetsRequestManager = testBuilder.offsetsRequestManager;
 coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-memberhipsManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
+HeartbeatRequestManager heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);

Review Comment:
   The variables `heartbeatRequestManager` and `membershipManager` are unused. 
Are they used to test the existence of `heartbeatRequestManager` and 
`membershipManager`? If so, could we rewrite them by `assertTrue`? For example: 
`assertTrue(testBuilder.heartbeatRequestManager.isPresent());`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove dev_version parameter from streams tests [kafka]

2024-05-06 Thread via GitHub


mjsax commented on PR #15874:
URL: https://github.com/apache/kafka/pull/15874#issuecomment-2097231440

   Thanks for the PR. Makes sense. What I don't fully understand is, what you 
mean by `This simplifies testing downstream, since the test parameters do not 
change with every version.`
   
   > In particular, some tests downstream are blacklisted because they do not 
work with ARM. These lists need to be updated every time DEV_VERSION is bumped.
   
   Seems this does not change after this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-05-06 Thread via GitHub


chia7712 commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1591692467


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -158,12 +162,24 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 .setName(annot.name().trim().isEmpty() ? null : annot.name())
 .setListenerName(annot.listener().trim().isEmpty() ? null : 
annot.listener())
 .setServerProperties(serverProperties)
+.setPerServerProperties(perServerProperties)
 .setSecurityProtocol(annot.securityProtocol())
 .setMetadataVersion(annot.metadataVersion())
 .build();
 type.invocationContexts(context.getRequiredTestMethod().getName(), 
config, testInvocations);
 }
 
+private void processClusterConfigProperty(ClusterConfigProperty property,

Review Comment:
   Please remove this unused function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-06 Thread via GitHub


chia7712 commented on PR #15766:
URL: https://github.com/apache/kafka/pull/15766#issuecomment-2097161445

   @frankvicky Could you please rebase code to trigger QA again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16470) kafka-dump-log --offsets-decoder should support new records

2024-05-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16470.

Fix Version/s: 3.8.0
   Resolution: Fixed

> kafka-dump-log --offsets-decoder should support new records
> ---
>
> Key: KAFKA-16470
> URL: https://issues.apache.org/jira/browse/KAFKA-16470
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16470; kafka-dump-log --offsets-decoder should support new records [kafka]

2024-05-06 Thread via GitHub


chia7712 merged PR #15652:
URL: https://github.com/apache/kafka/pull/15652


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16666) Migrate `TransactionLogMessageFormatter`, GroupMetadataMessageFormatter` and `OffsetsMessageFormatter`to tools module

2024-05-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-1:
---
Labels: need-kip  (was: )

> Migrate `TransactionLogMessageFormatter`, GroupMetadataMessageFormatter` and 
> `OffsetsMessageFormatter`to tools module
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: need-kip
>
> `GroupMetadataMessageFormatter`[0], `OffsetsMessageFormatter`[1], and 
> `TransactionLogMessageFormatter`[2] are used by ConsoleConsumer to parse data 
> of internal topics. Following the migration plan, we should move them to 
> tools module. Also, we need to keep the compatibility of command line. That 
> is to say, `ConsoleConsumer` can accept the previous package name and then 
> use the (java) implementation to parse and make same output.
> [0] 
> https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1269
> [1] 
> https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1248
> [2] 
> https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala#L145



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-06 Thread via GitHub


chia7712 commented on PR #15836:
URL: https://github.com/apache/kafka/pull/15836#issuecomment-2097148532

   @gaurav-narula Could you please rebase code to trigger QA again? It seems we 
have thread leaks in some tests :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16539) Can't update specific broker configs in pre-migration mode

2024-05-06 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844071#comment-17844071
 ] 

Chia-Ping Tsai commented on KAFKA-16539:


[~davidarthur] I have pushed it to trunk. Cherry-pick causes some conflicts on 
branch 3.7, so maybe we need to file a PR for backporting. Also, not sure 
whether we need to backport to branch 3.6, because it seems we don't release 
x.x.3 normally.

> Can't update specific broker configs in pre-migration mode
> --
>
> Key: KAFKA-16539
> URL: https://issues.apache.org/jira/browse/KAFKA-16539
> Project: Kafka
>  Issue Type: Bug
>  Components: config, kraft
>Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.8.0, 3.7.1, 3.6.3
>
>
> In migration mode, ZK brokers will have a forwarding manager configured. This 
> is used to forward requests to the KRaft controller once we get to that part 
> of the migration. However, prior to KRaft taking over as the controller 
> (known as pre-migration mode), the ZK brokers are still attempting to forward 
> IncrementalAlterConfigs to the controller.
> This works fine for cluster level configs (e.g., "-entity-type broker 
> --entity-default"), but this fails for specific broker configs (e.g., 
> "-entity-type broker --entity-id 1").
> This affects BROKER and BROKER_LOGGER config types.
> To workaround this bug, you can either disable migrations on the brokers 
> (assuming no migration has taken place), or proceed with the migration and 
> get to the point where KRaft is the controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-06 Thread via GitHub


chia7712 merged PR #15744:
URL: https://github.com/apache/kafka/pull/15744


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]

2024-05-06 Thread via GitHub


m1a2st commented on PR #15840:
URL: https://github.com/apache/kafka/pull/15840#issuecomment-2097144066

   Hello @chia7712 please review, Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]

2024-05-06 Thread via GitHub


m1a2st commented on PR #15840:
URL: https://github.com/apache/kafka/pull/15840#issuecomment-2097142502

   Because Old Test Only have tests with Zookeeper, thus I change old test for 
Zookeeper and Kraft, and add new tests for KRaft, I mainly test the the config 
update mode for configs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16608) AsyncKafkaConsumer doesn't honor interrupted thread status on KafkaConsumer.poll(Duration)

2024-05-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16608.

Fix Version/s: 3.8.0
   Resolution: Fixed

> AsyncKafkaConsumer doesn't honor interrupted thread status on 
> KafkaConsumer.poll(Duration)
> --
>
> Key: KAFKA-16608
> URL: https://issues.apache.org/jira/browse/KAFKA-16608
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Minor
> Fix For: 3.8.0
>
>
> The behaviour for KafkaConsumer.poll(Duration) when the calling thread is in 
> interrupted state is to throw InterruptException. The AsyncKafkaConsumer 
> doesn't do this. It only throws that exception if the interruption occurs 
> while it is waiting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-05-06 Thread via GitHub


chia7712 merged PR #15803:
URL: https://github.com/apache/kafka/pull/15803


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]

2024-05-06 Thread via GitHub


gharris1727 commented on code in PR #15469:
URL: https://github.com/apache/kafka/pull/15469#discussion_r1591671051


##
connect/api/src/main/java/org/apache/kafka/connect/data/Values.java:
##
@@ -766,135 +852,23 @@ protected static boolean 
canParseSingleTokenLiteral(Parser parser, boolean embed
 protected static SchemaAndValue parse(Parser parser, boolean embedded) 
throws NoSuchElementException {

Review Comment:
   My core difficulty is that the parsing logic and the conversion logic 
mutually depend on one another: 
   1. The convertTo methods check if the input is a String, and then run it 
through the Parser.
   2. After parsing a map or array, the Parser calls convertTo on the elements 
to "cast" them to a common schema
   
   I'm pretty sure convertTo -> parser -> convertTo is a reasonable cycle, and 
should happen all the time via convertToList, convertToMap.
   
   I don't think that parser -> convertTo -> parser is a useful cycle for 
multiple reasons, but proving that is a little bit slippery. With some time I 
think I can break this part of the cycle so that this doesn't end up as one big 
ball of code again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16356) Remove class-name dispatch in RemoteLogMetadataSerde

2024-05-06 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-16356.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Remove class-name dispatch in RemoteLogMetadataSerde
> 
>
> Key: KAFKA-16356
> URL: https://issues.apache.org/jira/browse/KAFKA-16356
> Project: Kafka
>  Issue Type: Task
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Greg Harris
>Assignee: Linu Shibu
>Priority: Trivial
>  Labels: newbie
> Fix For: 3.8.0
>
>
> The RemoteLogMetadataSerde#serialize receives a RemoteLogMetadata object, and 
> has to dispatch to one of four serializers depending on it's type. This is 
> done by taking the class name of the RemoteLogMetadata and looking it up in 
> maps to find the corresponding serializer for that class.
> This later requires an unchecked cast, because the RemoteLogMetadataTransform 
> is generic. This is all type-unsafe, and can be replaced with type-safe 
> if-elseif-else statements that may also be faster than the double-indirect 
> map lookups.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-05-06 Thread via GitHub


gharris1727 commented on PR #15620:
URL: https://github.com/apache/kafka/pull/15620#issuecomment-2097110125

   Thanks @linu-shibu for your patience, I was waiting for something else.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-05-06 Thread via GitHub


gharris1727 merged PR #15620:
URL: https://github.com/apache/kafka/pull/15620


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-06 Thread sanghyeok An (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844063#comment-17844063
 ] 

sanghyeok An commented on KAFKA-16670:
--

Hi, [~lianetm]! thanks for your comments. your description is clear, and i've 
understand it! 

 

So based on those expectations and back to your example, we don't need to wait 
before calling subscribe (that's handled internally by the 
HeartbeatRequestManager as described above). I wonder if it's the fact that in 
the failed case you're polling 10 times only (vs. 100 times in the successful 
case)?? In order to receive records, we do need to make sure that we are 
calling poll after the assignment has been received (so the consumer issues a 
fetch request for the partitions assigned). Note that even when you poll for 1s 
in your test, a poll that happens before the assignment has been received, will 
block for 1s but it's doomed to return empty, because it is not waiting for 
records from the topics you're interested in (no partitions assigned yet). 
Could you make sure that the test is calling poll after the assignment has been 
received? (I would suggest just polling while true for a certain amount of 
time, no sleeping after the subscribe needed).

 

Sorry, i make you confused. 

I intended to try it 100 times for both failure and success cases, but the code 
was set to attempt only 10 times in failure case. Anyway, as you suggested, I 
proceeded by logging the {{poll()}} attempts. 

!image-2024-05-07-08-34-06-855.png|width=721,height=269!
 # The consumer calls {{poll()}} up to 1000 times.
 # consumer will leave log ("i : " + i) each by each try. 
 # if consumer success to poll not empty record, consumer do countdown of 
countDownLatch. and then, we can check whether countDownLatch is 0. 

!image-2024-05-07-08-36-40-656.png|width=848,height=315!

I waited until it was called 430 times. it means consumer wait for assignment 
during about 430 sec. 

However, consumer could not get their assignment yet. 

!image-2024-05-07-08-38-27-753.png|width=1654,height=289!

However, after receiving the initial FindCoordinator Request, the broker does 
not perform any action. Please see the log above.

Broker don't have any log after 2024-05-06 23:29:27, but by the time of the 
430th attempt, it was already 2024-05-06 23:39:00. 

 

Anyway, it seems that at least one of the consumer or the broker has a 
potential issue.

What do you think? 

> KIP-848 : Consumer will not receive assignment forever because of concurrent 
> issue.
> ---
>
> Key: KAFKA-16670
> URL: https://issues.apache.org/jira/browse/KAFKA-16670
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
> Attachments: image-2024-05-07-08-34-06-855.png, 
> image-2024-05-07-08-36-22-983.png, image-2024-05-07-08-36-40-656.png, 
> image-2024-05-07-08-38-27-753.png
>
>
> *Related Code*
>  * Consumer get assignment Successfully :
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57]
>  * Consumer get be stuck Forever because of concurrent issue:
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79]
>  
> *Unexpected behaviour*
>  * 
> Broker is sufficiently slow.
>  * When a KafkaConsumer is created and immediately subscribes to a topic
> If both conditions are met, {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and become stuck indefinitely.
> In case of new broker and new consumer, when consumer are created, consumer 
> background thread send a request to broker. (I guess groupCoordinator 
> Heartbeat request). In that time, if broker does not load metadata from 
> {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After 
> broker load metadata completely, consumer background thread think 'this 
> broker is valid group coordinator'.
> However, consumer can send {{subscribe}} request to broker before {{broker}} 
> reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, 
> consumer seems to be stuck.
> If both conditions are met, the {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and may become indefinitely stuck. In the case 
> of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, 
> {{consumer background thread}} start to send a request to the broker. (I 
> believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this 
> time, if the {{broker}} has not yet loaded metadata from 
> {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once 
> the broker has completely loaded the metadata, 

[jira] [Updated] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-06 Thread sanghyeok An (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sanghyeok An updated KAFKA-16670:
-
Attachment: image-2024-05-07-08-38-27-753.png

> KIP-848 : Consumer will not receive assignment forever because of concurrent 
> issue.
> ---
>
> Key: KAFKA-16670
> URL: https://issues.apache.org/jira/browse/KAFKA-16670
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
> Attachments: image-2024-05-07-08-34-06-855.png, 
> image-2024-05-07-08-36-22-983.png, image-2024-05-07-08-36-40-656.png, 
> image-2024-05-07-08-38-27-753.png
>
>
> *Related Code*
>  * Consumer get assignment Successfully :
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57]
>  * Consumer get be stuck Forever because of concurrent issue:
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79]
>  
> *Unexpected behaviour*
>  * 
> Broker is sufficiently slow.
>  * When a KafkaConsumer is created and immediately subscribes to a topic
> If both conditions are met, {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and become stuck indefinitely.
> In case of new broker and new consumer, when consumer are created, consumer 
> background thread send a request to broker. (I guess groupCoordinator 
> Heartbeat request). In that time, if broker does not load metadata from 
> {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After 
> broker load metadata completely, consumer background thread think 'this 
> broker is valid group coordinator'.
> However, consumer can send {{subscribe}} request to broker before {{broker}} 
> reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, 
> consumer seems to be stuck.
> If both conditions are met, the {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and may become indefinitely stuck. In the case 
> of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, 
> {{consumer background thread}} start to send a request to the broker. (I 
> believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this 
> time, if the {{broker}} has not yet loaded metadata from 
> {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once 
> the broker has completely loaded the metadata, the {{consumer background 
> thread}} recognizes this broker as a valid group coordinator. However, there 
> is a possibility that the {{consumer}} can send a {{subscribe request}} to 
> the {{broker}} before the {{broker}} has replied to the {{{}GroupCoordinator 
> Heartbeat Request{}}}. In such a scenario, the {{consumer}} appears to be 
> stuck.
>  
> You can check this scenario, in the 
> {{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}.
>  If there is no sleep time to wait {{{}GroupCoordinator Heartbeat 
> Request{}}}, {{consumer}} will be always stuck. If there is a little sleep 
> time, {{consumer}} will always receive assignment.
>  
> README : 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md]
>  
> In my case, consumer get assignment in `docker-compose` : it means not enough 
> slow. 
> However, consumer cannot get assignmet in `testcontainers` without little 
> waiting time. : it means enough slow to cause concurrent issue. 
> `testconatiners` is docker in docker, thus `testcontainers` will be slower 
> than `docker-compose`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-06 Thread sanghyeok An (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sanghyeok An updated KAFKA-16670:
-
Attachment: image-2024-05-07-08-36-22-983.png

> KIP-848 : Consumer will not receive assignment forever because of concurrent 
> issue.
> ---
>
> Key: KAFKA-16670
> URL: https://issues.apache.org/jira/browse/KAFKA-16670
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
> Attachments: image-2024-05-07-08-34-06-855.png, 
> image-2024-05-07-08-36-22-983.png, image-2024-05-07-08-36-40-656.png
>
>
> *Related Code*
>  * Consumer get assignment Successfully :
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57]
>  * Consumer get be stuck Forever because of concurrent issue:
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79]
>  
> *Unexpected behaviour*
>  * 
> Broker is sufficiently slow.
>  * When a KafkaConsumer is created and immediately subscribes to a topic
> If both conditions are met, {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and become stuck indefinitely.
> In case of new broker and new consumer, when consumer are created, consumer 
> background thread send a request to broker. (I guess groupCoordinator 
> Heartbeat request). In that time, if broker does not load metadata from 
> {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After 
> broker load metadata completely, consumer background thread think 'this 
> broker is valid group coordinator'.
> However, consumer can send {{subscribe}} request to broker before {{broker}} 
> reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, 
> consumer seems to be stuck.
> If both conditions are met, the {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and may become indefinitely stuck. In the case 
> of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, 
> {{consumer background thread}} start to send a request to the broker. (I 
> believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this 
> time, if the {{broker}} has not yet loaded metadata from 
> {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once 
> the broker has completely loaded the metadata, the {{consumer background 
> thread}} recognizes this broker as a valid group coordinator. However, there 
> is a possibility that the {{consumer}} can send a {{subscribe request}} to 
> the {{broker}} before the {{broker}} has replied to the {{{}GroupCoordinator 
> Heartbeat Request{}}}. In such a scenario, the {{consumer}} appears to be 
> stuck.
>  
> You can check this scenario, in the 
> {{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}.
>  If there is no sleep time to wait {{{}GroupCoordinator Heartbeat 
> Request{}}}, {{consumer}} will be always stuck. If there is a little sleep 
> time, {{consumer}} will always receive assignment.
>  
> README : 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md]
>  
> In my case, consumer get assignment in `docker-compose` : it means not enough 
> slow. 
> However, consumer cannot get assignmet in `testcontainers` without little 
> waiting time. : it means enough slow to cause concurrent issue. 
> `testconatiners` is docker in docker, thus `testcontainers` will be slower 
> than `docker-compose`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-06 Thread sanghyeok An (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sanghyeok An updated KAFKA-16670:
-
Attachment: image-2024-05-07-08-36-40-656.png

> KIP-848 : Consumer will not receive assignment forever because of concurrent 
> issue.
> ---
>
> Key: KAFKA-16670
> URL: https://issues.apache.org/jira/browse/KAFKA-16670
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
> Attachments: image-2024-05-07-08-34-06-855.png, 
> image-2024-05-07-08-36-22-983.png, image-2024-05-07-08-36-40-656.png
>
>
> *Related Code*
>  * Consumer get assignment Successfully :
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57]
>  * Consumer get be stuck Forever because of concurrent issue:
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79]
>  
> *Unexpected behaviour*
>  * 
> Broker is sufficiently slow.
>  * When a KafkaConsumer is created and immediately subscribes to a topic
> If both conditions are met, {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and become stuck indefinitely.
> In case of new broker and new consumer, when consumer are created, consumer 
> background thread send a request to broker. (I guess groupCoordinator 
> Heartbeat request). In that time, if broker does not load metadata from 
> {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After 
> broker load metadata completely, consumer background thread think 'this 
> broker is valid group coordinator'.
> However, consumer can send {{subscribe}} request to broker before {{broker}} 
> reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, 
> consumer seems to be stuck.
> If both conditions are met, the {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and may become indefinitely stuck. In the case 
> of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, 
> {{consumer background thread}} start to send a request to the broker. (I 
> believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this 
> time, if the {{broker}} has not yet loaded metadata from 
> {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once 
> the broker has completely loaded the metadata, the {{consumer background 
> thread}} recognizes this broker as a valid group coordinator. However, there 
> is a possibility that the {{consumer}} can send a {{subscribe request}} to 
> the {{broker}} before the {{broker}} has replied to the {{{}GroupCoordinator 
> Heartbeat Request{}}}. In such a scenario, the {{consumer}} appears to be 
> stuck.
>  
> You can check this scenario, in the 
> {{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}.
>  If there is no sleep time to wait {{{}GroupCoordinator Heartbeat 
> Request{}}}, {{consumer}} will be always stuck. If there is a little sleep 
> time, {{consumer}} will always receive assignment.
>  
> README : 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md]
>  
> In my case, consumer get assignment in `docker-compose` : it means not enough 
> slow. 
> However, consumer cannot get assignmet in `testcontainers` without little 
> waiting time. : it means enough slow to cause concurrent issue. 
> `testconatiners` is docker in docker, thus `testcontainers` will be slower 
> than `docker-compose`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-06 Thread sanghyeok An (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sanghyeok An updated KAFKA-16670:
-
Attachment: image-2024-05-07-08-34-06-855.png

> KIP-848 : Consumer will not receive assignment forever because of concurrent 
> issue.
> ---
>
> Key: KAFKA-16670
> URL: https://issues.apache.org/jira/browse/KAFKA-16670
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
> Attachments: image-2024-05-07-08-34-06-855.png
>
>
> *Related Code*
>  * Consumer get assignment Successfully :
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57]
>  * Consumer get be stuck Forever because of concurrent issue:
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79]
>  
> *Unexpected behaviour*
>  * 
> Broker is sufficiently slow.
>  * When a KafkaConsumer is created and immediately subscribes to a topic
> If both conditions are met, {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and become stuck indefinitely.
> In case of new broker and new consumer, when consumer are created, consumer 
> background thread send a request to broker. (I guess groupCoordinator 
> Heartbeat request). In that time, if broker does not load metadata from 
> {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After 
> broker load metadata completely, consumer background thread think 'this 
> broker is valid group coordinator'.
> However, consumer can send {{subscribe}} request to broker before {{broker}} 
> reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, 
> consumer seems to be stuck.
> If both conditions are met, the {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and may become indefinitely stuck. In the case 
> of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, 
> {{consumer background thread}} start to send a request to the broker. (I 
> believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this 
> time, if the {{broker}} has not yet loaded metadata from 
> {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once 
> the broker has completely loaded the metadata, the {{consumer background 
> thread}} recognizes this broker as a valid group coordinator. However, there 
> is a possibility that the {{consumer}} can send a {{subscribe request}} to 
> the {{broker}} before the {{broker}} has replied to the {{{}GroupCoordinator 
> Heartbeat Request{}}}. In such a scenario, the {{consumer}} appears to be 
> stuck.
>  
> You can check this scenario, in the 
> {{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}.
>  If there is no sleep time to wait {{{}GroupCoordinator Heartbeat 
> Request{}}}, {{consumer}} will be always stuck. If there is a little sleep 
> time, {{consumer}} will always receive assignment.
>  
> README : 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md]
>  
> In my case, consumer get assignment in `docker-compose` : it means not enough 
> slow. 
> However, consumer cannot get assignmet in `testcontainers` without little 
> waiting time. : it means enough slow to cause concurrent issue. 
> `testconatiners` is docker in docker, thus `testcontainers` will be slower 
> than `docker-compose`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]

2024-05-06 Thread via GitHub


gharris1727 commented on code in PR #15469:
URL: https://github.com/apache/kafka/pull/15469#discussion_r1591636980


##
connect/api/src/main/java/org/apache/kafka/connect/data/Values.java:
##
@@ -766,135 +852,23 @@ protected static boolean 
canParseSingleTokenLiteral(Parser parser, boolean embed
 protected static SchemaAndValue parse(Parser parser, boolean embedded) 
throws NoSuchElementException {

Review Comment:
   Parser was protected, so I think it's still safe to refactor. The class 
doesn't show up here: 
https://javadoc.io/doc/org.apache.kafka/connect-api/latest/index.html
   
   I moved the existing Parser to Tokenizer, as it had a good interface 
already, and adding methods would just be clutter. The methods which took a 
Parser argument have now been moved to a single toplevel class named Parser. 
Both of these classes are package-local, so shouldn't appear in the API docs.
   
   I left almost all of the private/protected static methods in Values, just 
bringing a few over that were only ever called by the Parser. I tried moving 
things from Values to Parser to break the circular dependency, but this 
required moving nearly everything to Parser. The two classes are really 
intertwined, and i'm not really satisfied with this refactor now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]

2024-05-06 Thread via GitHub


TaiJuWu commented on code in PR #15862:
URL: https://github.com/apache/kafka/pull/15862#discussion_r1591630803


##
core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java:
##
@@ -33,16 +31,22 @@ public class ClusterTestExtensionsUnitTest {
 void testProcessClusterTemplate() {
 ClusterTestExtensions ext = new ClusterTestExtensions();
 ExtensionContext context = mock(ExtensionContext.class);
-Consumer testInvocations = 
mock(Consumer.class);
 ClusterTemplate annot = mock(ClusterTemplate.class);
-when(annot.value()).thenReturn("").thenReturn(" ");
+when(annot.value()).thenReturn("").thenReturn(" 
").thenReturn("test_empty_config");

Review Comment:
   @soarez Thanks for your comments. All is vert excellent!!! Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]

2024-05-06 Thread via GitHub


gharris1727 commented on code in PR #15469:
URL: https://github.com/apache/kafka/pull/15469#discussion_r1591616004


##
connect/api/src/main/java/org/apache/kafka/connect/data/Values.java:
##
@@ -177,7 +213,12 @@ public static Long convertToLong(Schema schema, Object 
value) throws DataExcepti
  * @throws DataException if the value could not be converted to a float
  */
 public static Float convertToFloat(Schema schema, Object value) throws 
DataException {

Review Comment:
   I added tests to get the methods themselves up to 100% coverage, but the 
overall class still is missing some coverage. Thanks for pointing this out, as 
there were certainly some pretty obvious cases that weren't tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-06 Thread via GitHub


junrao commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1589798416


##
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import org.apache.kafka.common.message.KRaftVersionRecord;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.ControlRecord;
+import org.apache.kafka.raft.Isolation;
+import org.apache.kafka.raft.LogFetchInfo;
+import org.apache.kafka.raft.ReplicatedLog;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.snapshot.RawSnapshotReader;
+import org.apache.kafka.snapshot.RecordsSnapshotReader;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+
+/**
+ * The KRaft state machine for tracking control records in the topic partition.
+ *
+ * This type keeps track of changes to the finalized kraft.version and the 
sets of voters between
+ * the latest snasphot and the log end offset.
+ *
+ * The are two actors/threads for this type. One is the KRaft driver which 
indirectly call a lot of

Review Comment:
   The are => There are



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##
@@ -728,83 +730,114 @@ public static MemoryRecords withLeaderChangeMessage(
 ByteBuffer buffer,
 LeaderChangeMessage leaderChangeMessage
 ) {
-writeLeaderChangeMessage(buffer, initialOffset, timestamp, 
leaderEpoch, leaderChangeMessage);
-buffer.flip();
-return MemoryRecords.readableRecords(buffer);
+try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+initialOffset,
+timestamp,
+leaderEpoch,
+buffer
+)
+) {
+builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+return builder.build();
+}
 }
 
-private static void writeLeaderChangeMessage(
-ByteBuffer buffer,
+public static MemoryRecords withSnapshotHeaderRecord(
 long initialOffset,
 long timestamp,
 int leaderEpoch,
-LeaderChangeMessage leaderChangeMessage
+ByteBuffer buffer,
+SnapshotHeaderRecord snapshotHeaderRecord
 ) {
-try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
-buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
-TimestampType.CREATE_TIME, initialOffset, timestamp,
-RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
-false, true, leaderEpoch, buffer.capacity())
+try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+initialOffset,
+timestamp,
+leaderEpoch,
+buffer
+)
 ) {
-builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+builder.appendSnapshotHeaderMessage(timestamp, 
snapshotHeaderRecord);
+return builder.build();
 }
 }
 
-public static MemoryRecords withSnapshotHeaderRecord(
+public static MemoryRecords withSnapshotFooterRecord(
 long initialOffset,
 long timestamp,
 int leaderEpoch,
 ByteBuffer buffer,
-SnapshotHeaderRecord snapshotHeaderRecord
+SnapshotFooterRecord snapshotFooterRecord
 ) {
-writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, 
leaderEpoch, snapshotHeaderRecord);
-buffer.flip();
-return MemoryRecords.readableRecords(buffer);
+try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+initialOffset,
+timestamp,
+leaderEpoch,
+buffer
+)
+) {
+

Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-05-06 Thread via GitHub


linu-shibu commented on PR #15620:
URL: https://github.com/apache/kafka/pull/15620#issuecomment-2096976398

   @gharris1727 any update on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-06 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843887#comment-17843887
 ] 

Lianet Magrans edited comment on KAFKA-16670 at 5/6/24 9:01 PM:


Hey [~chickenchickenlove], thanks for trying out this! Some clarification in 
case it helps. In the flow you described, the new consumer will send a request 
to find the group coordinator (FindCoordinator) when it gets created, but even 
if there's a call to consumer.subscribe right after, it won't send a request to 
subscribe (HeartbeatRequest) until it gets a response to the initial 
FindCoordinator request (HeartbeatManager skips sending requests if it does not 
know the coordinator 
[here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]).
 Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest 
request will be sent containing the new subscription. The consumer will then 
eventually receive the assignment, but we don't know exactly when from the 
consumer point of view. The rebalance callbacks are what signal to the consumer 
that the call to subscribe completed with an assignment received. So it's only 
after the consumer gets the assignment that a call to poll can return the 
records that are available. 

So based on those expectations and back to your example, we don't need to wait 
before calling subscribe (that's handled internally by the 
HeartbeatRequestManager as described above). I wonder if it's the fact that in 
the failed case you're polling 10 times only (vs. 100 times in the successful 
case)?? In order to receive records, we do need to make sure that we are 
calling poll after the assignment has been received (so the consumer issues a 
fetch request for the partitions assigned). Note that even when you poll for 1s 
in your test, a poll that happens before the assignment has been received, will 
block for 1s but it's doomed to return empty, because it is not waiting for 
records from the topics you're interested in (no partitions assigned yet). 
Could you make sure that the test is calling poll after the assignment has been 
received? (I would suggest just polling while true for a certain amount of 
time, no sleeping after the subscribe needed). 

This integration test for the consumer 
[testGroupConsumption|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L153]
 has a very similar logic to the one you're trying to achieve (create consumer, 
subscribe right away and consume), and since a new broker and consumer are 
setup for each test, the test will go down the same path of having to find a 
coordinator before sending the HeartbeatRequest with a subscription. The main 
difference from looking at both seems to be the limited number of polls in your 
failed test scenario, so let's try to rule that out to better isolate the 
situation. Hope it helps! Let me know


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], thanks for trying out this! Some clarification in 
case it helps. In the flow you described, the new consumer will send a request 
to find the group coordinator (FindCoordinator) when it gets created, but even 
if there's a call to consumer.subscribe right after, it won't send a request to 
subscribe until it gets a response to the initial FindCoordinator request 
(HeartbeatManager skips sending requests if it does not know the coordinator 
[here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]).
 Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest 
request will be sent containing the new subscription. The consumer will then 
eventually receive the assignment, but we don't know exactly when from the 
consumer point of view. The rebalance callbacks are what signal to the consumer 
that the call to subscribe completed with an assignment received. So it's only 
after the consumer gets the assignment that a call to poll can return the 
records that are available. 

So based on those expectations and back to your example, we don't need to wait 
before calling subscribe (that's handled internally by the 
HeartbeatRequestManager as described above). I wonder if it's the fact that in 
the failed case you're polling 10 times only (vs. 100 times in the successful 
case)?? In order to receive records, we do need to make sure that we are 
calling poll after the assignment has been received (so the consumer issues a 
fetch request for the partitions assigned). Note that even when you poll for 1s 
in your test, a poll that happens before the assignment has been received, will 
block for 1s but it's 

[jira] [Commented] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-06 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843887#comment-17843887
 ] 

Lianet Magrans commented on KAFKA-16670:


Hey [~chickenchickenlove], thanks for trying out this! Some clarification in 
case it helps. In the flow you described, the new consumer will send a request 
to find the group coordinator (FindCoordinator) when it gets created, but even 
if there's a call to consumer.subscribe right after, it won't send a request to 
subscribe until it gets a response to the initial FindCoordinator request 
(HeartbeatManager skips sending requests if it does not know the coordinator 
[here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]).
 Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest 
request will be sent containing the new subscription. The consumer will then 
eventually receive the assignment, but we don't know exactly when from the 
consumer point of view. The rebalance callbacks are what signal to the consumer 
that the call to subscribe completed with an assignment received. So it's only 
after the consumer gets the assignment that a call to poll can return the 
records that are available. 

So based on those expectations and back to your example, we don't need to wait 
before calling subscribe (that's handled internally by the 
HeartbeatRequestManager as described above). I wonder if it's the fact that in 
the failed case you're polling 10 times only (vs. 100 times in the successful 
case)?? In order to receive records, we do need to make sure that we are 
calling poll after the assignment has been received (so the consumer issues a 
fetch request for the partitions assigned). Note that even when you poll for 1s 
in your test, a poll that happens before the assignment has been received, will 
block for 1s but it's doomed to return empty, because it is not waiting for 
records from the topics you're interested in (no partitions assigned yet). 
Could you make sure that the test is calling poll after the assignment has been 
received? (I would suggest just polling while true for a certain amount of 
time, no sleeping after the subscribe needed). 

This integration test for the consumer 
[testGroupConsumption|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L153]
 has a very similar logic to the one you're trying to achieve (create consumer, 
subscribe right away and consume), and since a new broker and consumer are 
setup for each test, the test will go down the same path of having to find a 
coordinator before sending the HeartbeatRequest with a subscription. The main 
difference from looking at both seems to be the limited number of polls in your 
failed test scenario, so let's try to rule that out to better isolate the 
situation. Hope it helps! Let me know

> KIP-848 : Consumer will not receive assignment forever because of concurrent 
> issue.
> ---
>
> Key: KAFKA-16670
> URL: https://issues.apache.org/jira/browse/KAFKA-16670
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
>
> *Related Code*
>  * Consumer get assignment Successfully :
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57]
>  * Consumer get be stuck Forever because of concurrent issue:
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79]
>  
> *Unexpected behaviour*
>  * 
> Broker is sufficiently slow.
>  * When a KafkaConsumer is created and immediately subscribes to a topic
> If both conditions are met, {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and become stuck indefinitely.
> In case of new broker and new consumer, when consumer are created, consumer 
> background thread send a request to broker. (I guess groupCoordinator 
> Heartbeat request). In that time, if broker does not load metadata from 
> {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After 
> broker load metadata completely, consumer background thread think 'this 
> broker is valid group coordinator'.
> However, consumer can send {{subscribe}} request to broker before {{broker}} 
> reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, 
> consumer seems to be stuck.
> If both conditions are met, the {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and may become indefinitely stuck. In the case 
> 

Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-06 Thread via GitHub


mumrah commented on PR #15744:
URL: https://github.com/apache/kafka/pull/15744#issuecomment-2096872249

   [Latest test 
run](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15744/13/tests)
 looks pretty reasonable. The failures all look unrelated 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14237) Kafka TLS Doesn't Present Intermediary Certificates when using PEM

2024-05-06 Thread Gaurav Narula (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843869#comment-17843869
 ] 

Gaurav Narula commented on KAFKA-14237:
---

Hi [~soxcks] 

I'm unable to reproduce this issue on 3.7. Here's the self-signed cert chain I 
used for checking locally:
{code:java}
-BEGIN CERTIFICATE-
MIID2DCCAsCgAwIBAgICEAAwDQYJKoZIhvcNAQELBQAwZTELMAkGA1UEBhMCVVMx
DjAMBgNVBAgMBVN0YXRlMREwDwYDVQQHDAhMb2NhdGlvbjEPMA0GA1UECgwGQXBh
Y2hlMQ4wDAYDVQQLDAVLYWZrYTESMBAGA1UEAwwJVGVzdCBSb290MB4XDTI0MDUw
NjIwMjEyM1oXDTM0MDUwNDIwMjEyM1owVjEWMBQGA1UEAwwNVGVzdCBCcm9rZXIg
MTEOMAwGA1UECAwFU3RhdGUxCzAJBgNVBAYTAlVTMQ8wDQYDVQQKDAZBcGFjaGUx
DjAMBgNVBAsMBUthZmthMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA
2IfbD26ChT2+/hhPijvk1OIzglDgj+YKZ7Uj01cR0nOg7STIcmIL1D5BuUljtrEq
xPqgXDOZIXn1526OfRK+u+1+Adw9mtclM1pnZkgAH2EkgAND/L4NLq07NfC3jOBe
vF6UiP1Yg6KoLJAit96y5HOrlXm0hLd6MRDEgHnDtnzDPhMtV03a+JXFAbhfRENq
nu/a6hkbodHMh697eSHqifahCpPqq6WraLk43u5P8jzdq7sm8GIjAKaGlkdbCib5
gW6W8ChHQ8fNchKKH1WuAazQeO6X2CGvt0B0JhUX5UsP83Tfqojfgi3MSggOAIDQ
Ll7C2eK0XMG1e+qagI65SwIDAQABo4GgMIGdMAkGA1UdEwQCMAAwCwYDVR0PBAQD
AgH2MCcGA1UdJQQgMB4GCCsGAQUFBwMBBggrBgEFBQcDAgYIKwYBBQUHAwgwHQYD
VR0OBBYEFPAi9B9Dj5WmLBfnu8yEbH7KLW01MBoGA1UdEQQTMBGHBH8AAAGCCWxv
Y2FsaG9zdDAfBgNVHSMEGDAWgBTrXaBrjpP8LHcbfhOcrWvo4+otxjANBgkqhkiG
9w0BAQsFAAOCAQEAlPyEgC4egJIajAD2PIeeyXS/eyv43kkNTqsJL9NBepG2njM+
yK1GV1jWwFfoe7IYMtcOme+1tuoNwXUl7gM2l8KRVSue4QLkDo604JHPmvnfqDoh
MxOhC2dw96Kh69NgnlR3Ajp/Dg/kPDG2FOL3lowISVhNzTQDr773f7n80CxbgRmq
IrlQ/S/j7tF5K1BB8WOinZZUhiUO/TuCmUROK8NKCAIAI5F7c6AbrlbASGFHsRLU
Bgrdq9x4X0LVt8GzgwOPk2lpYvC3oggUgig0DqDraJRsFJ0sbxGEZhoftgNzb1Bg
jDLHahebah4Cd1csKeI/v8a/r5d4LJ/JrYTv5g==
-END CERTIFICATE-
-BEGIN CERTIFICATE-
MIIDvTCCAqWgAwIBAgIUAzMUkUfTFIln8o4Qi1wHNcixmEcwDQYJKoZIhvcNAQEL
BQAwZTELMAkGA1UEBhMCVVMxDjAMBgNVBAgMBVN0YXRlMREwDwYDVQQHDAhMb2Nh
dGlvbjEPMA0GA1UECgwGQXBhY2hlMQ4wDAYDVQQLDAVLYWZrYTESMBAGA1UEAwwJ
VGVzdCBSb290MB4XDTI0MDUwNjIwMjEyM1oXDTM0MDUwNDIwMjEyM1owZTELMAkG
A1UEBhMCVVMxDjAMBgNVBAgMBVN0YXRlMREwDwYDVQQHDAhMb2NhdGlvbjEPMA0G
A1UECgwGQXBhY2hlMQ4wDAYDVQQLDAVLYWZrYTESMBAGA1UEAwwJVGVzdCBSb290
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzuz4dYs7/CqkhHVXO8Zz
+1/avBmgXVHgrVMV0njqqtGXH5fHaKQgrmO+dKHjy/brcQbZ+vbctq3F37OdpYwk
pIepKYqEdT9yaR8Eb1m3iWnk3cwwz+QvVwYMHMncOfoMooDvb5jwb1bpsMovsdwe
NvvY3LtoUF4POIGCH79KmwOSJDDpnixVeZHIYHAyhxAE0LM4xEAinmuvp6t1pgtZ
1urid/uZvk/JbWnp+WB1dr7jVGih6dqbjDBfyoI+3APgxiVMySZYTL3kPbE8aJYD
tOXDOO8+0+g+7sGSOPrTF5LsGyE/CDd4lbx4T5mQpavm2iRmuGckXLtBRGJ3xODN
fwIDAQABo2UwYzAMBgNVHRMEBTADAQH/MAsGA1UdDwQEAwIB9jAnBgNVHSUEIDAe
BggrBgEFBQcDAQYIKwYBBQUHAwIGCCsGAQUFBwMIMB0GA1UdDgQWBBTrXaBrjpP8
LHcbfhOcrWvo4+otxjANBgkqhkiG9w0BAQsFAAOCAQEATDJ9+6qQat6V3Bbm0kWk
L+xy2ETefq9ctT4smXLatkUmtiMs/+ZM762iT3QRGC2kKgK2GITucwiemsUR3NkY
V+Y9iqFIkZkdhCfBQB6SAcXhYV5ucBTga0jGE0awEedLEQ6ow/9iUKCfXvH82dWK
t38GFHjrqv6gXrGoJKNYUDYVukZnyLWkwd2LD92AXNJPadaWswVJhve/aWkPVSXo
f2E/wMG/euP3ulDyzLBE4jrx01rn+nxVVN2zm61mcrTovSu+mft2EB9E9Qs4BVDk
vas7tbpsS1mijEoCaArtI8M/IPHRLPE2puM89/fn/jnopUNMZyB6MnaeXsTR/vlm
6w==
-END CERTIFICATE-
 {code}
The private key:
{code:java}
-BEGIN PRIVATE KEY-
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDYh9sPboKFPb7+
GE+KO+TU4jOCUOCP5gpntSPTVxHSc6DtJMhyYgvUPkG5SWO2sSrE+qBcM5khefXn
bo59Er677X4B3D2a1yUzWmdmSAAfYSSAA0P8vg0urTs18LeM4F68XpSI/ViDoqgs
kCK33rLkc6uVebSEt3oxEMSAecO2fMM+Ey1XTdr4lcUBuF9EQ2qe79rqGRuh0cyH
r3t5IeqJ9qEKk+qrpatouTje7k/yPN2ruybwYiMApoaWR1sKJvmBbpbwKEdDx81y
EoofVa4BrNB47pfYIa+3QHQmFRflSw/zdN+qiN+CLcxKCA4AgNAuXsLZ4rRcwbV7
6pqAjrlLAgMBAAECggEAFRowZDGd8MRSv8q4vb0WkRS2dmXRbNS7gN3rbuZWa08v
iM0D5/ncM0QZ/afEWwKrK2VNiY7RxqxvJa3lnxTbl88Ob7n6GwQKsRWHbDVqJaS+
/ObUkmYnPLxPP/OEv+sB8JO7IBqorLOGdklZyNegUZlgSIIC8Mg81VlP/UFgrIEQ
VPaVZ/55XCg4jgs6C0uf6NqWrw+BrWziECpV596j+qRHlJErk1DOxuBDSzoZ1Iek
P/5ap3ZhMe7cjrz31HrthdtTxA1rcaW9B4w9V6CZuosgDRcc/b4aqtf6E6PmBtAD
VFGEF94bxwHNZ83b11/kCqreGfeu5Jj99enZ+Ou5CQKBgQDumpEpUWiKk3joUL2l
76Fg3ZZtFdHBqRZpHpnHybt0Q9bRVN/hT+MrDnOuw5rXR9QNbaFCf9KArz9Q/oB/
GZKlSX7GzeeCgHvVpssnUd0imhMckRuvGUekIHnnHEbusmbxyNmdoomOF92+mqus
cpQXavvPGVV8p3WTnJDt0kvcnwKBgQDoUU3lIx6ySd/KH34beBQFfMpG2BQtkGrs
40brRRUaUNsQFjBxjPbiUwPrXV1J/FEiWAy4PLwElcegJTP6YjhNSwWDH0eQA1bs
Ub3AMpk8mU6lgfhsDtm1S7ek0mnuQCgM1oWY89QLWSy5pmmAfOac2By9ILcwPeZD
WB+FXrY31QKBgQDbjT6lVlNrr+dBXYokdit4hm00Uy9/k6cbcxztyaLDiOjSFdcr
6+aMZ+/qj/KaxW1KLeaE2jlIT/li/cwfJ9jYXphZNn4ghzlrjt7Af4OLo1qSnrNq
m0hgrcF993cNjPtM4BPeCQGpziwshwYQ2B2MrtSl7BnNagm2mgqBy1Ai4QKBgElM
ze0MRbUvReL6SMnV+0s38oKjzsoJlRMlKs00wNHKzTOoLKTHO2Zxlvz+Ol8Ls3XI
nkrLLu+rao8G7f2EXAtXLmgOyH+R7i0mJV6tGFhcbsod1goSLXLcbxccJLw9leVn
EkQOOstR2aDB9uvJfOHj9j1eQy5/eVWqSlfEaG35AoGAZtbhiDkVAHk1YBPnvkHd
rX+eQw5rNhqFiWrRJu+9RYjHGLieRL4bIZNpw8S+2cYv3MsPmQ7tWozkxJSR1x84
0OIMpGfClo9v4499TwrNtqrhSBXYdY6EsUBrbYeu54USUEQmclfQuuWIRfdbVfyn
6n6txW70n2EQZC3/I0ECWTc=
-END PRIVATE KEY-
{code}
The Root CA:
{code:java}
-BEGIN CERTIFICATE-
MIIDvTCCAqWgAwIBAgIUAzMUkUfTFIln8o4Qi1wHNcixmEcwDQYJKoZIhvcNAQEL

[jira] [Updated] (KAFKA-16531) Fix check-quorum calculation to not assume that the leader is in the voter set

2024-05-06 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-16531:
---
Summary: Fix check-quorum calculation to not assume that the leader is in 
the voter set  (was: Fix check-quorum calculation to no assume that the leader 
is in the voter set)

> Fix check-quorum calculation to not assume that the leader is in the voter set
> --
>
> Key: KAFKA-16531
> URL: https://issues.apache.org/jira/browse/KAFKA-16531
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.8.0
>
>
> In the check-quorum calculation, the leader should not assume that it is part 
> of the voter set. This may happen when the leader is removing itself from the 
> voter set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16530) Fix high-watermark calculation to not assume the leader is in the voter set

2024-05-06 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-16530:
---
Summary: Fix high-watermark calculation to not assume the leader is in the 
voter set  (was: Fix high-watermark calculation to no assume the leader is in 
the voter set)

> Fix high-watermark calculation to not assume the leader is in the voter set
> ---
>
> Key: KAFKA-16530
> URL: https://issues.apache.org/jira/browse/KAFKA-16530
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.8.0
>
>
> When the leader is being removed from the voter set, the leader may not be in 
> the voter set. This means that kraft should not assume that the leader is 
> part of the high-watermark calculation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16676) Security docs missing RPCs from KIP-714 and KIP-1000

2024-05-06 Thread Andrew Schofield (Jira)
Andrew Schofield created KAFKA-16676:


 Summary: Security docs missing RPCs from KIP-714 and KIP-1000
 Key: KAFKA-16676
 URL: https://issues.apache.org/jira/browse/KAFKA-16676
 Project: Kafka
  Issue Type: Improvement
  Components: docs
Affects Versions: 3.7.0, 3.8.0
Reporter: Andrew Schofield
Assignee: Andrew Schofield
 Fix For: 3.8.0


KIPs 714 and 1000 introduced 3 new RPCs to do with client metrics. None of them 
was added to the list of RPCs in the security documentation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]

2024-05-06 Thread via GitHub


lianetm commented on code in PR #15408:
URL: https://github.com/apache/kafka/pull/15408#discussion_r1591377324


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala:
##
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package integration.kafka.api
+
+import kafka.api.{AbstractConsumerTest, BaseConsumerTest}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener}
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.util
+import java.util.Arrays.asList
+import java.util.Collections
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.stream.Stream
+
+/**
+ * Integration tests for the consumer that cover interaction with the consumer 
from within callbacks
+ * and listeners.
+ */
+class PlaintextConsumerCallbackTest extends AbstractConsumerTest {
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerRebalanceListenerAssignOnPartitionsAssigned(quorum: String, 
groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0);
+triggerOnPartitionsAssigned { (consumer, _) =>
+  val e: Exception = assertThrows(classOf[IllegalStateException], () => 
consumer.assign(Collections.singletonList(tp)))
+  assertEquals(e.getMessage, "Subscription to topics, partitions and 
pattern are mutually exclusive")
+}
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerRebalanceListenerAssignmentOnPartitionsAssigned(quorum: 
String, groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0);
+triggerOnPartitionsAssigned { (consumer, _) =>
+  assertTrue(consumer.assignment().contains(tp));
+}
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def 
testConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned(quorum: 
String, groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0);
+triggerOnPartitionsAssigned { (consumer, _) =>
+  val map = consumer.beginningOffsets(Collections.singletonList(tp))
+  assertTrue(map.containsKey(tp))
+  assertEquals(0, map.get(tp))
+}
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerRebalanceListenerAssignOnPartitionsRevoked(quorum: String, 
groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0);
+triggerOnPartitionsRevoked { (consumer, _) =>
+  val e: Exception = assertThrows(classOf[IllegalStateException], () => 
consumer.assign(Collections.singletonList(tp)))
+  assertEquals(e.getMessage, "Subscription to topics, partitions and 
pattern are mutually exclusive")

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]

2024-05-06 Thread via GitHub


lianetm commented on code in PR #15408:
URL: https://github.com/apache/kafka/pull/15408#discussion_r1591377060


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala:
##
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package integration.kafka.api
+
+import kafka.api.{AbstractConsumerTest, BaseConsumerTest}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener}
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.util
+import java.util.Arrays.asList
+import java.util.Collections
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.stream.Stream
+
+/**
+ * Integration tests for the consumer that cover interaction with the consumer 
from within callbacks
+ * and listeners.
+ */
+class PlaintextConsumerCallbackTest extends AbstractConsumerTest {
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerRebalanceListenerAssignOnPartitionsAssigned(quorum: String, 
groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0);
+triggerOnPartitionsAssigned { (consumer, _) =>
+  val e: Exception = assertThrows(classOf[IllegalStateException], () => 
consumer.assign(Collections.singletonList(tp)))
+  assertEquals(e.getMessage, "Subscription to topics, partitions and 
pattern are mutually exclusive")

Review Comment:
   nit: the message is totally accurate but this test has nothing to do with 
pattern subscription, so maybe clearer for whoever gets the failure to be 
specific about the problem on this test (subscribe & assign mutually 
exclusive). We'll probably end up adding another test where the failure would 
be calling subscribe(Pattern) on the callback, and that one would require the 
message for the 2 subscribe calls being mutually exclusive. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-06 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1591319255


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1700,16 +1710,16 @@ private void handleResponse(RaftResponse.Inbound 
response, long currentTimeMs) {
 }
 
 /**
- * Validate a request which is only valid between voters. If an error is
- * present in the returned value, it should be returned in the response.
+ * Validate common state for requests to establish leadership.
+ *
+ * These include the Vote, BeginQuorumEpoch rnd EndQuorumEpoch RPCs. If an 
error is present in
+ * the returned value, it should be returned in the response.
  */
 private Optional validateVoterOnlyRequest(int remoteNodeId, int 
requestEpoch) {
 if (requestEpoch < quorum.epoch()) {
 return Optional.of(Errors.FENCED_LEADER_EPOCH);
 } else if (remoteNodeId < 0) {
 return Optional.of(Errors.INVALID_REQUEST);
-} else if (quorum.isObserver() || !quorum.isVoter(remoteNodeId)) {
-return Optional.of(Errors.INCONSISTENT_VOTER_SET);

Review Comment:
   In KIP-853, `INCONSISTEN_VOTER_SET` is deprecated and replicas will not 
return this error anymore.
   
   In this case replicas that think they are observer need to be allowed to 
vote if the leader thinks they are voters. This can happen if a voter is added 
to the set of voters right before an election cycle and the VotersRecord has 
been replicated to the new voter.



##
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##
@@ -112,45 +120,30 @@ public void initialize(OffsetAndEpoch 
logEndOffsetAndEpoch) throws IllegalStateE
 // when we send Vote or BeginEpoch requests.
 
 ElectionState election;
-try {
-election = store.readElectionState();
-if (election == null) {
-election = ElectionState.withUnknownLeader(0, voters);
-}
-} catch (final UncheckedIOException e) {
-// For exceptions during state file loading (missing or not 
readable),
-// we could assume the file is corrupted already and should be 
cleaned up.
-log.warn("Clearing local quorum state store after error loading 
state {}",
-store, e);
-store.clear();
-election = ElectionState.withUnknownLeader(0, voters);
-}
+election = store
+.readElectionState()
+.orElseGet(() -> ElectionState.withUnknownLeader(0, 
latestVoterSet.get().voterIds()));
 
 final EpochState initialState;
-if (!election.voters().isEmpty() && !voters.equals(election.voters())) 
{
-throw new IllegalStateException("Configured voter set: " + voters
-+ " is different from the voter set read from the state file: 
" + election.voters()
-+ ". Check if the quorum configuration is up to date, "
-+ "or wipe out the local state file if necessary");
-} else if (election.hasVoted() && !isVoter()) {
-String localIdDescription = localId.isPresent() ?
-localId.getAsInt() + " is not a voter" :
-"is undefined";
-throw new IllegalStateException("Initialized quorum state " + 
election
-+ " with a voted candidate, which indicates this node was 
previously "
-+ " a voter, but the local id " + localIdDescription);

Review Comment:
   In KIP-853, replicas that think they are observer need to be allowed to vote 
if the leader thinks they are voters. This can happen if a voter is added to 
the set of voters right before an election cycle and the VotersRecord has been 
replicated to the new voter.



##
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##
@@ -336,40 +346,54 @@ public void transitionToUnattached(int epoch) {
  */
 public void transitionToVoted(
 int epoch,
-int candidateId
+ReplicaKey candidateKey
 ) {
-if (localId.isPresent() && candidateId == localId.getAsInt()) {
-throw new IllegalStateException("Cannot transition to Voted with 
votedId=" + candidateId +
-" and epoch=" + epoch + " since it matches the local 
broker.id");
-} else if (isObserver()) {
-throw new IllegalStateException("Cannot transition to Voted with 
votedId=" + candidateId +
-" and epoch=" + epoch + " since the local broker.id=" + 
localId + " is not a voter");
-} else if (!isVoter(candidateId)) {
-throw new IllegalStateException("Cannot transition to Voted with 
voterId=" + candidateId +
-" and epoch=" + epoch + " since it is not one of the voters " 
+ voters);
-}

Review Comment:
   In KIP-853, replicas that think they are observer need to be allowed to vote 
if the leader thinks they are 

Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]

2024-05-06 Thread via GitHub


lianetm commented on code in PR #15408:
URL: https://github.com/apache/kafka/pull/15408#discussion_r1591300734


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala:
##
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package integration.kafka.api
+
+import kafka.api.{AbstractConsumerTest, BaseConsumerTest}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener}
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.util
+import java.util.Arrays.asList
+import java.util.Collections
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.stream.Stream
+
+/**
+ * Integration tests for the consumer that cover interaction with the consumer 
from within callbacks
+ * and listeners.

Review Comment:
   nit: "callbacks and listeners" in this context refer to the same right? 
maybe just "rebalance callbacks"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-05-06 Thread via GitHub


jeqo commented on PR #15379:
URL: https://github.com/apache/kafka/pull/15379#issuecomment-2096481453

   Thanks @C0urante! I've applied most suggestions, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-05-06 Thread via GitHub


jeqo commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1591285305


##
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class SingleFieldPathTest {
+
+@Test void shouldFindField() {
+SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA);
+Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+
+assertEquals(barSchema.field("bar"), 
pathV2("foo.bar").fieldFrom(schema));
+assertEquals(schema.field("foo"), pathV2("foo").fieldFrom(schema));
+}
+
+@Test void shouldReturnNullFieldWhenFieldNotFound() {
+SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA);
+Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+
+assertNull(pathV2("un.known").fieldFrom(schema));
+assertNull(pathV2("foo.unknown").fieldFrom(schema));
+assertNull(pathV2("unknown").fieldFrom(schema));
+assertNull(pathV2("test").fieldFrom(null));
+}
+
+@Test void shouldFindValueInMap() {
+Map foo = new HashMap<>();
+foo.put("bar", 42);
+foo.put("baz", null);
+Map map = new HashMap<>();
+map.put("foo", foo);
+
+assertEquals(42, pathV2("foo.bar").valueFrom(map));
+assertNull(pathV2("foo.baz").valueFrom(map));
+}
+
+@Test void shouldReturnNullValueWhenFieldNotFoundInMap() {
+Map foo = new HashMap<>();
+foo.put("bar", 42);
+foo.put("baz", null);
+Map map = new HashMap<>();
+map.put("foo", foo);
+
+assertNull(new SingleFieldPath("un.known", 
FieldSyntaxVersion.V2).valueFrom(map));
+assertNull(new SingleFieldPath("foo.unknown", 
FieldSyntaxVersion.V2).valueFrom(map));
+assertNull(new SingleFieldPath("unknown", 
FieldSyntaxVersion.V2).valueFrom(map));
+assertNull(new SingleFieldPath("foo.baz", 
FieldSyntaxVersion.V2).valueFrom(map));
+assertNull(new SingleFieldPath("foo.baz.inner", 
FieldSyntaxVersion.V2).valueFrom(map));
+}
+
+@Test void shouldFindValueInStruct() {
+SchemaBuilder bazSchema = SchemaBuilder.struct()
+.field("inner", Schema.STRING_SCHEMA);
+SchemaBuilder barSchema = SchemaBuilder.struct()
+.field("bar", Schema.INT32_SCHEMA)
+.field("baz", bazSchema.optional());
+Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+Struct foo = new Struct(barSchema)
+.put("bar", 42)
+.put("baz", null);
+Struct struct = new Struct(schema).put("foo", foo);
+
+assertEquals(42, pathV2("foo.bar").valueFrom(struct));
+assertNull(pathV2("foo.baz").valueFrom(struct));
+}
+
+@Test void shouldReturnNullValueWhenFieldNotFoundInStruct() {
+SchemaBuilder bazSchema = SchemaBuilder.struct()
+.field("inner", Schema.STRING_SCHEMA);
+SchemaBuilder barSchema = SchemaBuilder.struct()
+.field("bar", Schema.INT32_SCHEMA)
+.field("baz", bazSchema.optional());
+Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+Struct foo = new Struct(barSchema)
+.put("bar", 42)
+.put("baz", null);
+Struct struct = new Struct(schema).put("foo", foo);
+
+assertNull(new SingleFieldPath("un.known", 
FieldSyntaxVersion.V2).valueFrom(struct));
+assertNull(new SingleFieldPath("foo.unknown", 
FieldSyntaxVersion.V2).valueFrom(struct));
+assertNull(new SingleFieldPath("unknown", 
FieldSyntaxVersion.V2).valueFrom(struct));
+assertNull(new 

Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-06 Thread via GitHub


gharris1727 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1591278453


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {
+log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+"in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +
+"If you see this happening consistently, then it can be addressed 
by either adding more workers " +
+"to the connect cluster or by increasing the rebalance.timeout.ms 
configuration value. Please note that " +

Review Comment:
   I think this is decent advice when requests are small and can be distributed 
around the cluster, but as REST requests are rather infrequent, I think this is 
the minority of cases.
   
   I think most often this timeout is going to be triggered by an excessively 
slow connector start, stop, or validation. In those cases, adding more workers 
does nothing but move the error to a different worker. I think we can keep the 
"adding more workers" comment, if we include another piece of advice for 
debugging excessively blocking tasks. If we don't have that other piece of 
advice, then advising users to add workers is misleading.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {

Review Comment:
   Since we (as maintainers) don't have good insight into what commonly causes 
the herder tick thread to block and the poll timeout to fire, we recently added 
https://issues.apache.org/jira/browse/KAFKA-15563 to help users debug it 
themselves.
   
   It would be nice to integrate with this system to have the heartbeat thread 
report what the herder tick thread was blocked on at the time that the poll 
timeout happened, as this would report stalling that isn't caused by REST 
requests.
   
   The integration is tricky though, because the WorkerCoordinator is (and 
should be) unaware of the DistributedHerder. And currently I think the 
WorkerCoordinator hides these internal disconnects and reconnects inside of the 
poll method. Perhaps we can extend the WorkerRebalanceListener or have a new 
error listener to allow the herder to be informed about these errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]

2024-05-06 Thread via GitHub


cadonna commented on code in PR #15870:
URL: https://github.com/apache/kafka/pull/15870#discussion_r1591277510


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId 
taskId) {
 tasks.addPendingTaskToCloseClean(taskId);
 }
 
+private void addToTasksToClose(final Map> futures,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater) {
+iterateAndActOnFuture(futures, removedTaskResult -> {
+final Task task = removedTaskResult.task();
+final Optional exception = 
removedTaskResult.exception();
+if (exception.isPresent()) {
+tasksToCloseDirtyFromStateUpdater.add(task);
+} else {
+tasksToCloseCleanFromStateUpdater.add(task);
+}
+});
+}
+
+private void iterateAndActOnFuture(final Map> futures,
+   final 
java.util.function.Consumer action) {
+for (final Map.Entry> entry : futures.entrySet()) {
+final TaskId taskId = entry.getKey();
+final CompletableFuture future = 
entry.getValue();
+try {
+final StateUpdater.RemovedTaskResult removedTaskResult = 
waitForFuture(taskId, future);
+action.accept(removedTaskResult);
+} catch (final ExecutionException executionException) {
+log.warn("An exception happened when removing task {} from the 
state updater. The exception will be handled later: ",
+taskId, executionException);
+} catch (final InterruptedException ignored) { }

Review Comment:
   I am actually in favor of treating them as fatal and throw an 
`IllegalStateException` to make it more explicit that interruption of a stream 
thread should not happen. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]

2024-05-06 Thread via GitHub


cadonna commented on code in PR #15870:
URL: https://github.com/apache/kafka/pull/15870#discussion_r1591262531


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1421,15 +1422,20 @@ public void 
shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() {
 .withInputPartitions(taskId02Partitions).build();
 final TasksRegistry tasks = mock(TasksRegistry.class);
 final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, 
task2, task3), tasks);
+final CompletableFuture future1 = new 
CompletableFuture<>();

Review Comment:
   Let me write one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]

2024-05-06 Thread via GitHub


cadonna commented on code in PR #15870:
URL: https://github.com/apache/kafka/pull/15870#discussion_r1591261680


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId 
taskId) {
 tasks.addPendingTaskToCloseClean(taskId);
 }
 
+private void addToTasksToClose(final Map> futures,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater) {
+iterateAndActOnFuture(futures, removedTaskResult -> {
+final Task task = removedTaskResult.task();
+final Optional exception = 
removedTaskResult.exception();
+if (exception.isPresent()) {
+tasksToCloseDirtyFromStateUpdater.add(task);
+} else {
+tasksToCloseCleanFromStateUpdater.add(task);
+}
+});
+}
+
+private void iterateAndActOnFuture(final Map> futures,
+   final 
java.util.function.Consumer action) {
+for (final Map.Entry> entry : futures.entrySet()) {
+final TaskId taskId = entry.getKey();
+final CompletableFuture future = 
entry.getValue();
+try {
+final StateUpdater.RemovedTaskResult removedTaskResult = 
waitForFuture(taskId, future);
+action.accept(removedTaskResult);
+} catch (final ExecutionException executionException) {
+log.warn("An exception happened when removing task {} from the 
state updater. The exception will be handled later: ",
+taskId, executionException);
+} catch (final InterruptedException ignored) { }

Review Comment:
   We are quite inconsistent on how we treat `InterruptedException`. In some 
places we ignore them because they should not happen and in others we treat 
them as fatal and throw an `IllegalStateException` because they should not 
happen [1].
   
   [1] 
https://github.com/apache/kafka/blob/b36cf4ef977fb14bc57683630a9f3f3680705550/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L597
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16619) Unnecessary controller warning : "Loaded ZK migration state of NONE"

2024-05-06 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16619:
-
Labels: good-first-issue  (was: )

> Unnecessary controller warning : "Loaded ZK migration state of NONE"
> 
>
> Key: KAFKA-16619
> URL: https://issues.apache.org/jira/browse/KAFKA-16619
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 3.6.2
>Reporter: F Méthot
>Priority: Trivial
>  Labels: good-first-issue
>
> When we launch a fresh cluster of Kafka and Kraft Controller, no zookeeper 
> involved.
> We get this warning in the controller log:
> [2024-04-15 03:44:33,881] WARN [QuorumController id=3] Performing controller 
> activation. Loaded ZK migration state of NONE. 
> (org.apache.kafka.controller.QuorumController)
>  
> Our project has no business with Zookeeper, seeing this message prompted us 
> to investigate and spend time looking up this warning to find an explanation.
> We have that setting
> {_}zookeeper.metadata.migration.enable{_}=false
> and we still get that warning.
> In future version, to avoid further confusion this message should not be 
> showed when zookeeper is not involved at all .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-06 Thread via GitHub


mumrah commented on PR #15744:
URL: https://github.com/apache/kafka/pull/15744#issuecomment-2096415182

   Thanks for continued reviews @chia7712 ,  I've addressed your latest 
feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]

2024-05-06 Thread via GitHub


cadonna commented on code in PR #15870:
URL: https://github.com/apache/kafka/pull/15870#discussion_r1591223152


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId 
taskId) {
 tasks.addPendingTaskToCloseClean(taskId);
 }
 
+private void addToTasksToClose(final Map> futures,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater) {
+iterateAndActOnFuture(futures, removedTaskResult -> {
+final Task task = removedTaskResult.task();
+final Optional exception = 
removedTaskResult.exception();
+if (exception.isPresent()) {
+tasksToCloseDirtyFromStateUpdater.add(task);

Review Comment:
   Ah, OK, yes they are logged in the `DefaultStateUpdater` when they happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-06 Thread via GitHub


gaurav-narula commented on PR #15836:
URL: https://github.com/apache/kafka/pull/15836#issuecomment-2096392283

   Resolved conflict with `trunk`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]

2024-05-06 Thread via GitHub


lucasbru commented on code in PR #15870:
URL: https://github.com/apache/kafka/pull/15870#discussion_r1591216607


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId 
taskId) {
 tasks.addPendingTaskToCloseClean(taskId);
 }
 
+private void addToTasksToClose(final Map> futures,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater) {
+iterateAndActOnFuture(futures, removedTaskResult -> {
+final Task task = removedTaskResult.task();
+final Optional exception = 
removedTaskResult.exception();
+if (exception.isPresent()) {
+tasksToCloseDirtyFromStateUpdater.add(task);

Review Comment:
   I said logging, not throwing. The error should be logged somewhere, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-06 Thread via GitHub


gharris1727 commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1591196797


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -16,6 +16,15 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.lang.reflect.Modifier;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteOrder;
+import java.nio.file.StandardOpenOption;
+import java.util.AbstractMap;
+import java.util.EnumSet;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;

Review Comment:
   I think this should be reverted, we don't touch anything else in this file.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -392,6 +399,146 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+/**
+ * General-purpose validation logic for converters that are configured 
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be 
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+ *  may be null, in which case no validation will 
be performed under the assumption that the
+ *  connector will use inherit the converter 
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ *(e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+ *may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+ *  from an instance of the plugin type (e.g., 
{@code Converter::config});
+ *  may not be null
+ * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+ *   may not be null
+ * @param pluginProperty the property used to define a custom class for 
the plugin type
+ *   in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ *   may not be null
+ * @param defaultProperties any default properties to include in the 
configuration that will be used for
+ *  the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+ * config)
+
+ * @param  the plugin class to perform validation for
+ */
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue pluginConfigValue,
+Class pluginInterface,
+Function configDefAccessor,
+String pluginName,
+String pluginProperty,
+Map defaultProperties
+) {
+Objects.requireNonNull(connectorConfig);
+Objects.requireNonNull(pluginInterface);
+Objects.requireNonNull(configDefAccessor);
+Objects.requireNonNull(pluginName);
+Objects.requireNonNull(pluginProperty);
+
+String pluginClass = connectorConfig.get(pluginProperty);
+
+if (pluginClass == null
+|| pluginConfigValue == null
+|| !pluginConfigValue.errorMessages().isEmpty()
+) {
+// Either no custom converter was specified, or one was specified 
but there's a problem with it.
+// No need to proceed any further.
+return null;
+}
+
+T pluginInstance;
+try {
+pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+} catch (ClassNotFoundException | RuntimeException e) {
+log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", pluginName, pluginClass, e);
+pluginConfigValue.addErrorMessage("Failed to load class " + 
pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+return null;
+}
+
+try {
+ConfigDef configDef;
+try {
+configDef = configDefAccessor.apply(pluginInstance);
+} catch (RuntimeException e) {
+log.error("Failed to load ConfigDef from {} of type {}", 
pluginName, pluginClass, e);
+pluginConfigValue.addErrorMessage("Failed to load ConfigDef 
from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+return null;
+}
+if (configDef == null) {
+log.warn("{}.config() has returned a null ConfigDef; no 
further preflight config 

Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]

2024-05-06 Thread via GitHub


cadonna commented on code in PR #15870:
URL: https://github.com/apache/kafka/pull/15870#discussion_r1591204696


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId 
taskId) {
 tasks.addPendingTaskToCloseClean(taskId);
 }
 
+private void addToTasksToClose(final Map> futures,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater) {
+iterateAndActOnFuture(futures, removedTaskResult -> {
+final Task task = removedTaskResult.task();
+final Optional exception = 
removedTaskResult.exception();
+if (exception.isPresent()) {
+tasksToCloseDirtyFromStateUpdater.add(task);
+} else {
+tasksToCloseCleanFromStateUpdater.add(task);
+}
+});
+}
+
+private void iterateAndActOnFuture(final Map> futures,
+   final 
java.util.function.Consumer action) {
+for (final Map.Entry> entry : futures.entrySet()) {
+final TaskId taskId = entry.getKey();
+final CompletableFuture future = 
entry.getValue();
+try {
+final StateUpdater.RemovedTaskResult removedTaskResult = 
waitForFuture(taskId, future);
+action.accept(removedTaskResult);
+} catch (final ExecutionException executionException) {
+log.warn("An exception happened when removing task {} from the 
state updater. The exception will be handled later: ",
+taskId, executionException);
+} catch (final InterruptedException ignored) { }

Review Comment:
   Let me check...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]

2024-05-06 Thread via GitHub


cadonna commented on code in PR #15870:
URL: https://github.com/apache/kafka/pull/15870#discussion_r1591204373


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId 
taskId) {
 tasks.addPendingTaskToCloseClean(taskId);
 }
 
+private void addToTasksToClose(final Map> futures,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater) {
+iterateAndActOnFuture(futures, removedTaskResult -> {
+final Task task = removedTaskResult.task();
+final Optional exception = 
removedTaskResult.exception();
+if (exception.isPresent()) {
+tasksToCloseDirtyFromStateUpdater.add(task);

Review Comment:
   I thought that these tasks are lost anyways, so why should we bother 
throwing an exception. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]

2024-05-06 Thread via GitHub


cadonna commented on code in PR #15870:
URL: https://github.com/apache/kafka/pull/15870#discussion_r1591198311


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -71,6 +73,10 @@
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
 
 public class TaskManager {
+
+private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +

Review Comment:
   No, we do not have that yet, but I had the same thought. I will look for a 
good place after these PRs are merged.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]

2024-05-06 Thread via GitHub


gharris1727 commented on PR #14729:
URL: https://github.com/apache/kafka/pull/14729#issuecomment-2096338791

   Hi @C0urante could you take another pass on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]

2024-05-06 Thread via GitHub


jeffkbkim commented on code in PR #15835:
URL: https://github.com/apache/kafka/pull/15835#discussion_r1591194245


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -208,8 +209,8 @@ public void recordEventQueueTime(long durationMs) { }
 public void recordEventQueueProcessingTime(long durationMs) { }
 
 @Override
-public void recordThreadIdleRatio(double ratio) {
-threadIdleRatioSensor.record(ratio);
+public synchronized void recordThreadIdleTime(long idleTimeMs, long 
currentTimeMs) {
+threadIdleTimeRate.record(metrics.config(), idleTimeMs, currentTimeMs);

Review Comment:
   Thanks for the correction. Added the rate to the sensor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16665: Allow to initialize newly assigned partition's positions without allowing fetching while callback runs [kafka]

2024-05-06 Thread via GitHub


lianetm commented on PR #15856:
URL: https://github.com/apache/kafka/pull/15856#issuecomment-2096322959

   Thanks @lucasbru, conflicts solved. Good pointer about the new file for 
callback tests, I created https://issues.apache.org/jira/browse/KAFKA-16675 
assigned to me to make sure I move the test as soon as that one gets merged. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16675) Move rebalance callback test for positions to callbacks test file

2024-05-06 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16675:
--

 Summary: Move rebalance callback test for positions to callbacks 
test file
 Key: KAFKA-16675
 URL: https://issues.apache.org/jira/browse/KAFKA-16675
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


Integration test 
testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback was added 
to the PlaintextConsumerTest.scala in this PR 
https://github.com/apache/kafka/pull/15856, as there was no specific file for 
testing callbacks at the moment. Another PR is in-flight, adding the file for 
callback-related tests, https://github.com/apache/kafka/pull/15408. Once 15408 
gets merged, we should move 
testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback to it.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16665: Allow to initialize newly assigned partition's positions without allowing fetching while callback runs [kafka]

2024-05-06 Thread via GitHub


lucasbru commented on PR #15856:
URL: https://github.com/apache/kafka/pull/15856#issuecomment-2096266336

   @lianetm some conflicts need to be resolved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-05-06 Thread via GitHub


lianetm commented on PR #15844:
URL: https://github.com/apache/kafka/pull/15844#issuecomment-2096232639

   Did we consider the approach of simply decoupling the request timeout from 
the application event timeout? We could issue the fetch request without a time 
boundary (max value probably), and get the application event result with the 
time boundary 
([here](https://github.com/apache/kafka/blob/097522abd6b51bca2407ea0de7009ed6a2d970b4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1672-L1676)).
 
   
   Expressing the intention when creating the request and event seems clearer 
and brings what we want: fetch requests would remain in the background thread 
until they get a response or timeout, so they could be reused by a following 
fetch application event (for the same partitions). Then we could keep the 
manager logic simple and consistent around how inflights are maintained 
(removed when they get a response or expire, as it is now). I may be missing 
something, thoughts? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-06 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1591129328


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +474,48 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerZkVersionOpt: Option[Int] = if 
(!enableEntityConfigNoController) {

Review Comment:
   The name "enableEntityConfigNoController" is meant to convey "Enable setting 
entity configs even when there is no controller". But even as I've been coding 
this i've mixed up the meaning more than once  
   
   I'll go with `enableEntityConfigControllerCheck`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-06 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1591129328


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +474,48 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerZkVersionOpt: Option[Int] = if 
(!enableEntityConfigNoController) {

Review Comment:
   The name "enableEntityConfigNoController" is meant to convey "Enable setting 
entity configs even when there is no controller". But even as I've been coding 
this i've mixed up the meaning more than once  
   
   How about: `requireEntityConfigControllerCheck`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Various cleanups in clients tests [kafka]

2024-05-06 Thread via GitHub


mimaison opened a new pull request, #15877:
URL: https://github.com/apache/kafka/pull/15877

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]

2024-05-06 Thread via GitHub


lucasbru commented on code in PR #15870:
URL: https://github.com/apache/kafka/pull/15870#discussion_r1591121181


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId 
taskId) {
 tasks.addPendingTaskToCloseClean(taskId);
 }
 
+private void addToTasksToClose(final Map> futures,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater) {
+iterateAndActOnFuture(futures, removedTaskResult -> {
+final Task task = removedTaskResult.task();
+final Optional exception = 
removedTaskResult.exception();
+if (exception.isPresent()) {
+tasksToCloseDirtyFromStateUpdater.add(task);
+} else {
+tasksToCloseCleanFromStateUpdater.add(task);
+}
+});
+}
+
+private void iterateAndActOnFuture(final Map> futures,
+   final 
java.util.function.Consumer action) {
+for (final Map.Entry> entry : futures.entrySet()) {
+final TaskId taskId = entry.getKey();
+final CompletableFuture future = 
entry.getValue();
+try {
+final StateUpdater.RemovedTaskResult removedTaskResult = 
waitForFuture(taskId, future);
+action.accept(removedTaskResult);
+} catch (final ExecutionException executionException) {
+log.warn("An exception happened when removing task {} from the 
state updater. The exception will be handled later: ",
+taskId, executionException);
+} catch (final InterruptedException ignored) { }

Review Comment:
   Can we just ignore this? I see other classes in the package rethrowing it



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -71,6 +73,10 @@
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
 
 public class TaskManager {
+
+private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +

Review Comment:
   I see that's already defined in a couple of places. Did you check if there 
is a good utility class where this could be defined?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId 
taskId) {
 tasks.addPendingTaskToCloseClean(taskId);
 }
 
+private void addToTasksToClose(final Map> futures,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater) {
+iterateAndActOnFuture(futures, removedTaskResult -> {
+final Task task = removedTaskResult.task();
+final Optional exception = 
removedTaskResult.exception();
+if (exception.isPresent()) {
+tasksToCloseDirtyFromStateUpdater.add(task);

Review Comment:
   Is the exception already logged somehwere else? We are just dropping it here.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1421,15 +1422,20 @@ public void 
shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() {
 .withInputPartitions(taskId02Partitions).build();
 final TasksRegistry tasks = mock(TasksRegistry.class);
 final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, 
task2, task3), tasks);
+final CompletableFuture future1 = new 
CompletableFuture<>();

Review Comment:
   Do we need a test that covers the part where we get an exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [KAFKA-16646] Don't run cve scan job on forks [kafka]

2024-05-06 Thread via GitHub


omkreddy merged PR #15831:
URL: https://github.com/apache/kafka/pull/15831


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]

2024-05-06 Thread via GitHub


ashmeet13 commented on PR #12988:
URL: https://github.com/apache/kafka/pull/12988#issuecomment-2096203457

   Fixing the failing build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-06 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1591110984


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -1037,24 +1104,35 @@ class ZkMigrationIntegrationTest {
 admin.alterUserScramCredentials(alterations)
   }
 
-  def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = {
-TestUtils.retry(1) {
+  def verifyTopicConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean): Unit 
= {
+maybeRetry(shouldRetry, 1) {
   val propsAfter = zkClient.getEntityConfigs(ConfigType.TOPIC, "test")
   assertEquals("204800", 
propsAfter.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG))
   assertFalse(propsAfter.containsKey(TopicConfig.SEGMENT_MS_CONFIG))
 }
   }
 
-  def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
-TestUtils.retry(1) {
-  assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, 
Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate"))
-  assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, 
"").getProperty("consumer_byte_rate"))
-  assertEquals("800", zkClient.getEntityConfigs("users/" + 
Sanitizer.sanitize("user@1") + "/clients", 
"clientA").getProperty("consumer_byte_rate"))
-  assertEquals("100", zkClient.getEntityConfigs("users/" + 
Sanitizer.sanitize("user@1") + "/clients", 
"clientA").getProperty("producer_byte_rate"))
-  assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, 
"8.8.8.8").getProperty("connection_creation_rate"))
+  def verifyBrokerConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean): Unit 
= {
+maybeRetry(shouldRetry, 1) {
+  val defaultBrokerProps = zkClient.getEntityConfigs(ConfigType.BROKER, 
"")
+  assertEquals("8640", 
defaultBrokerProps.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
+
+  val broker0Props = zkClient.getEntityConfigs(ConfigType.BROKER, "0")
+  assertEquals("4320", 
broker0Props.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
+
+  val broker1Props = zkClient.getEntityConfigs(ConfigType.BROKER, "1")
+  assertEquals("4320", 
broker1Props.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
 }
   }
 
+  def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {

Review Comment:
   Ah good catch! I did not mean to change this logic. Will fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]

2024-05-06 Thread via GitHub


nizhikov commented on PR #15873:
URL: https://github.com/apache/kafka/pull/15873#issuecomment-2096190310

   Hello @chia7712 
   
   Second part of `ConfigCommandTest` refactoring ready for review.
   Please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] John confluent unclean recovery elect leader request [kafka]

2024-05-06 Thread via GitHub


mannoopj commented on PR #15876:
URL: https://github.com/apache/kafka/pull/15876#issuecomment-2096185204

   WIP


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16673) Optimize toTopicPartitions with ConsumerProtocolSubscription

2024-05-06 Thread Dongnuo Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongnuo Lyu reassigned KAFKA-16673:
---

Assignee: Dongnuo Lyu

> Optimize toTopicPartitions with ConsumerProtocolSubscription
> 
>
> Key: KAFKA-16673
> URL: https://issues.apache.org/jira/browse/KAFKA-16673
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
>
> https://github.com/apache/kafka/pull/15798#discussion_r1582981154



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16674) Adjust classicGroupJoinToConsumerGroup to add subscription model

2024-05-06 Thread Dongnuo Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongnuo Lyu reassigned KAFKA-16674:
---

Assignee: Dongnuo Lyu

> Adjust classicGroupJoinToConsumerGroup to add subscription model
> 
>
> Key: KAFKA-16674
> URL: https://issues.apache.org/jira/browse/KAFKA-16674
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
>
> [https://github.com/apache/kafka/pull/15785] adds subscription model to the 
> group state that affects `classicGroupJoinToConsumerGroup`. We'll need to 
> make adjustment to comply with the change once #15785 is merged.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16674) Adjust classicGroupJoinToConsumerGroup to add subscription model

2024-05-06 Thread Dongnuo Lyu (Jira)
Dongnuo Lyu created KAFKA-16674:
---

 Summary: Adjust classicGroupJoinToConsumerGroup to add 
subscription model
 Key: KAFKA-16674
 URL: https://issues.apache.org/jira/browse/KAFKA-16674
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dongnuo Lyu


[https://github.com/apache/kafka/pull/15785] adds subscription model to the 
group state that affects `classicGroupJoinToConsumerGroup`. We'll need to make 
adjustment to comply with the change once #15785 is merged.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-05-06 Thread via GitHub


lianetm commented on PR #15844:
URL: https://github.com/apache/kafka/pull/15844#issuecomment-2096155474

   One concern on 
[comment](https://github.com/apache/kafka/pull/15844#discussion_r1591065112) 
above about how we identify this situation (inflight fetch requests that we 
shouldn't delete too soon). 
   
   Another one about where to consider the situation. Inflight requests are 
removed in 2 places: direct call to fetch (handled in this PR), but also from 
the commit manager poll. The commit manager (as other managers) has logic for 
removing all expired requests in its poll loop, when calling 
[failAndRemoveExpiredFetchRequests](https://github.com/apache/kafka/blob/42754336e1ff35cb45661f1a906fc24b761b27cf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1182).
 Shouldn't we consider that too?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16673) Optimize toTopicPartitions with ConsumerProtocolSubscription

2024-05-06 Thread Dongnuo Lyu (Jira)
Dongnuo Lyu created KAFKA-16673:
---

 Summary: Optimize toTopicPartitions with 
ConsumerProtocolSubscription
 Key: KAFKA-16673
 URL: https://issues.apache.org/jira/browse/KAFKA-16673
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dongnuo Lyu


https://github.com/apache/kafka/pull/15798#discussion_r1582981154



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-05-06 Thread via GitHub


lianetm commented on code in PR #15844:
URL: https://github.com/apache/kafka/pull/15844#discussion_r1591065112


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1145,14 +1141,42 @@ private CompletableFuture> addOffsetFetch
 inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
 
 if (dupe.isPresent() || inflight.isPresent()) {
-log.info("Duplicated OffsetFetchRequest: " + 
request.requestedPartitions);
-dupe.orElseGet(inflight::get).chainFuture(request.future);
+log.info("Duplicate OffsetFetchRequest found for partitions: 
{}", request.requestedPartitions);
+OffsetFetchRequestState originalRequest = 
dupe.orElseGet(inflight::get);
+originalRequest.chainFuture(request.future);
 } else {
 this.unsentOffsetFetches.add(request);
 }
 return request.future;
 }
 
+/**
+ * Remove the {@link OffsetFetchRequestState request} from the 
inflight requests queue iff
+ * both of the following are true:
+ *
+ * 
+ * The request completed with a null {@link 
Throwable error}
+ * The request is not {@link OffsetFetchRequestState#isExpired 
expired}
+ * 
+ *
+ * 
+ *
+ * In some cases, even though an offset fetch request may complete 
without an error, technically
+ * the request took longer than the user's provided timeout. In that 
case, the application thread will
+ * still receive a timeout error, and will shortly try to fetch these 
offsets again. Keeping the result
+ * of the current attempt will enable the next 
attempt to use that result and return
+ * almost immediately.
+ */
+private void maybeRemoveInflightOffsetFetch(OffsetFetchRequestState 
fetchRequest, Throwable error) {
+if (error == null && !fetchRequest.isExpired) {

Review Comment:
   this line implies a big change in the current logic, that I wonder if we're 
taking too far. Agree with not removing the expired requests (that's the root 
cause of the problem we have), but why putting all errors (not only timeout) in 
the same bucket? With this new check, how are we ensuring that fetch requests 
that fail fatally are removed from the inflight queue? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-06 Thread via GitHub


johnnychhsu commented on PR #15861:
URL: https://github.com/apache/kafka/pull/15861#issuecomment-2096037200

   updated and `./gradlew clean core:test --tests ClusterTestExtensionsTest 
--tests ClusterConfigTest` passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-06 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1590935155


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -203,6 +189,22 @@ public ConsumerGroupMember build() {
 return member;
 }
 
+private boolean ownsRevokedPartitions(

Review Comment:
   nit: Should we add some javadoc?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class NoOpPartitionAssignor implements PartitionAssignor {
+static final String NAME = "no-op";
+@Override

Review Comment:
   nit: Let's add an empty line before this one.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -203,6 +189,22 @@ public ConsumerGroupMember build() {
 return member;
 }
 
+private boolean ownsRevokedPartitions(
+Map> assignment
+) {
+if (ownedTopicPartitions == null) return true;
+
+for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions 
: ownedTopicPartitions) {
+for (Integer partitionId : topicPartitions.partitions()) {
+if (assignment.getOrDefault(topicPartitions.topicId(), 
Collections.emptySet()).contains(partitionId)) {

Review Comment:
   nit: There is a small optimization here. We could do 
`assignment.getOrDefault(topicPartitions.topicId(), Collections.emptySet())` 
before looping on the partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] John confluent unclean recovery elect leader request [kafka]

2024-05-06 Thread via GitHub


mannoopj commented on PR #15876:
URL: https://github.com/apache/kafka/pull/15876#issuecomment-2096008537

   WIP


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Correct connector scheduled rebalance logs [kafka]

2024-05-06 Thread via GitHub


yuz10 opened a new pull request, #15875:
URL: https://github.com/apache/kafka/pull/15875

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Remove dev_version parameter from streams tests [kafka]

2024-05-06 Thread via GitHub


lucasbru opened a new pull request, #15874:
URL: https://github.com/apache/kafka/pull/15874

   In two streams tests, we are using the current snapshot version as a test 
parameter `to_version`, but as the only option. We can hardcode it. This 
simplifies testing downstream, since the test parameters do not change with 
every version. In particular, some tests downstream are blacklisted because 
they do not work with ARM. These lists need to be updated every time 
`DEV_VERSION` is bumped.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-06 Thread via GitHub


johnnychhsu commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1590943709


##
core/src/test/java/kafka/test/annotation/ClusterTest.java:
##
@@ -44,4 +44,5 @@
 String listener() default "";
 MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0;
 ClusterConfigProperty[] serverProperties() default {};
+Tags[] tags() default {};

Review Comment:
   I feel key-value pairs are more flexible, users can choose what to add for 
display by the key, which makes it easy to format and understand. But I am also 
open to a string array idea, both works for me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-06 Thread via GitHub


johnnychhsu commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1590940053


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -83,6 +81,7 @@ private ClusterConfig(Type type, int brokers, int 
controllers, int disksPerBroke
 this.saslServerProperties = 
Objects.requireNonNull(saslServerProperties);
 this.saslClientProperties = 
Objects.requireNonNull(saslClientProperties);
 this.perBrokerOverrideProperties = 
Objects.requireNonNull(perBrokerOverrideProperties);
+this.tags = Objects.requireNonNull(extendTags(tags));

Review Comment:
   I see, you mean except for the `nameTags`, also keep the origin tags and add 
a getter for it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove `ConsumerGroupPartitionMetadataValue.Epoch` field [kafka]

2024-05-06 Thread via GitHub


dajac merged PR #15854:
URL: https://github.com/apache/kafka/pull/15854


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-06 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1590911279


##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -108,12 +117,12 @@ public void testClusterTests() {
 }
 
 @ClusterTests({
-@ClusterTest(clusterType = Type.ZK),
-@ClusterTest(clusterType = Type.ZK, disksPerBroker = 2),
-@ClusterTest(clusterType = Type.KRAFT),
-@ClusterTest(clusterType = Type.KRAFT, disksPerBroker = 2),
-@ClusterTest(clusterType = Type.CO_KRAFT),
-@ClusterTest(clusterType = Type.CO_KRAFT, disksPerBroker = 2)
+@ClusterTest(clusterType = Type.ZK),

Review Comment:
   ditto



##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -70,28 +79,28 @@ public void testClusterTest(ClusterInstance 
clusterInstance) {
 @ClusterTemplate("generate1")
 public void testClusterTemplate() {
 Assertions.assertEquals(ClusterInstance.ClusterType.ZK, 
clusterInstance.clusterType(),
-"generate1 provided a Zk cluster, so we should see that here");
+"generate1 provided a Zk cluster, so we should see that here");

Review Comment:
   ditto



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.consumer.group;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import static java.util.Collections.singleton;
+
+class ConsumerGroupExecutor {

Review Comment:
   Other tests (for example: #15872) need both this and `generator`. How about 
renaming this to `ConsumerGroupCommandTestUtils` and move the implementation of 
`generator` to it?



##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -135,4 +144,49 @@ public void testNoAutoStart() {
 public void testDefaults(ClusterInstance clusterInstance) {
 Assertions.assertEquals(MetadataVersion.IBP_3_8_IV0, 
clusterInstance.config().metadataVersion());
 }
+
+@ClusterTests({
+@ClusterTest(name = "enable-new-coordinator", clusterType = 
Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
+}),
+@ClusterTest(name = "enable-new-consumer-rebalance-coordinator", 
clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
+}),
+@ClusterTest(name = 
"enable-new-coordinator-and-new-consumer-rebalance-coordinator", clusterType = 
Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
+@ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
+}),
+@ClusterTest(name = 
"enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", 
clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
+@ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
+}),
+@ClusterTest(name = 
"disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", 
clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
+@ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
+}),
+   

[PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]

2024-05-06 Thread via GitHub


nizhikov opened a new pull request, #15873:
URL: https://github.com/apache/kafka/pull/15873

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16393) SslTransportLayer doesn't implement write(ByteBuffer[], int, int) correctly

2024-05-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16393.

Fix Version/s: 3.8.0
   Resolution: Fixed

> SslTransportLayer doesn't implement write(ByteBuffer[], int, int) correctly
> ---
>
> Key: KAFKA-16393
> URL: https://issues.apache.org/jira/browse/KAFKA-16393
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
> Fix For: 3.8.0
>
>
> As of Kafka 3.7.0, SslTransportLayer.write(ByteBuffer[], int, int) is 
> implemented like below:
> {code:java}
> public long write(ByteBuffer[] srcs, int offset, int length) throws 
> IOException {
> ...
> int i = offset;
> while (i < length) {
> if (srcs[i].hasRemaining() || hasPendingWrites()) {
> 
> {code}
> The loop index starts at `offset` and ends with `length`.
> However this isn't correct because end-index should be `offset + length`.
> Let's say we have the array of ByteBuffer with length = 5 and try calling 
> this method with offset = 3, length = 1.
> In current code, `write(srcs, 3, 1)` doesn't attempt any write because the 
> loop condition is immediately false.
> For now, seems this method is only called with args offset = 0, length = 
> srcs.length in Kafka code base so not causing any problem though, we should 
> fix this because this could introduce subtle bug if use this method with 
> different args in the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16393 read/write sequence of buffers correctly [kafka]

2024-05-06 Thread via GitHub


chia7712 merged PR #15571:
URL: https://github.com/apache/kafka/pull/15571


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: use classic consumer with ZK mode for DeleteOffsetsConsumerGroupCommandIntegrationTest [kafka]

2024-05-06 Thread via GitHub


chia7712 commented on code in PR #15872:
URL: https://github.com/apache/kafka/pull/15872#discussion_r1590841207


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -88,7 +113,7 @@ public void testDeleteOffsetsNonExistingGroup() {
 }
 }
 
-@ClusterTest
+@ClusterTemplate("generator")
 public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
 for (Map consumerConfig: consumerConfigs) {

Review Comment:
   `consumerConfigs` need to be rewritten by 
`ClusterInstance#supportedGroupProtocols`
   
   see https://github.com/apache/kafka/pull/15766/files#r1590222192



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: use classic consumer with ZK mode for DeleteOffsetsConsumerGroupCommandIntegrationTest [kafka]

2024-05-06 Thread via GitHub


chia7712 commented on PR #15872:
URL: https://github.com/apache/kafka/pull/15872#issuecomment-2095735857

   Maybe we should merge `DeleteOffsetsConsumerGroupCommandIntegrationTest` and 
`DeleteConsumerGroupsTest`. They are used to test delete-related commands.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8721: Kafka metrics improvements [kafka]

2024-05-06 Thread via GitHub


jsto commented on PR #7121:
URL: https://github.com/apache/kafka/pull/7121#issuecomment-2095734292

   Hello. We are also being flagged with metrics-core EOL. Do we have any 
update on this task?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-06 Thread via GitHub


satishd commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590586656


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   Why is it increased to 10 secs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-06 Thread via GitHub


satishd commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590586656


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   Why is it increased to 10 secs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [1/N] ConfigCommandTest rewritten in java [kafka]

2024-05-06 Thread via GitHub


chia7712 merged PR #15850:
URL: https://github.com/apache/kafka/pull/15850


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >