Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]

2024-04-04 Thread via GitHub


soarez commented on code in PR #12174:
URL: https://github.com/apache/kafka/pull/12174#discussion_r1551129021


##
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala:
##
@@ -260,9 +274,12 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
   }
 
   def killBroker(index: Int): Unit = {

Review Comment:
   The original implementation has a timeout of 5 minutes, so I'll keep the 
same behavior here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]

2024-04-04 Thread via GitHub


soarez commented on code in PR #12174:
URL: https://github.com/apache/kafka/pull/12174#discussion_r1551132385


##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -197,12 +198,11 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 verifyNonDaemonThreadsStatus()
   }
 
-  @Disabled
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("kraft"))
   def testCleanShutdownWithKRaftControllerUnavailable(quorum: String): Unit = {

Review Comment:
   That's a good point. If the controller is unavailable, the shutdown can't 
really be considered clean. However, I think it's better to just remove the 
`Clean` word from the method, rather than replacing it with `Dirty`. It seems 
cleaner and less confusing, as it might not be clear what "dirty" is referring 
to. Please let me know if you disagree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]

2024-04-04 Thread via GitHub


soarez commented on code in PR #12174:
URL: https://github.com/apache/kafka/pull/12174#discussion_r1551132979


##
core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java:
##
@@ -323,7 +324,7 @@ public void rollingBrokerRestart() {
 throw new IllegalStateException("Tried to restart brokers but 
the cluster has not been started!");
 }
 for (int i = 0; i < clusterReference.get().brokerCount(); i++) {
-clusterReference.get().killBroker(i);
+clusterReference.get().killBroker(i, Duration.ofSeconds(5));

Review Comment:
   I think this is unnecessary. Removing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Update release.py to trim "-SNAPSHOT" from docs/js/templateData.js [kafka]

2024-04-04 Thread via GitHub


omkreddy opened a new pull request, #15651:
URL: https://github.com/apache/kafka/pull/15651

   Update release.py to trim "-SNAPSHOT" from docs/js/templateData.js while 
adding "Bump version commit" during release process. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: Kafka capabilities

2024-04-04 Thread Kafka Life
Dear Kafka experts , Could anyone having this data share the details please

On Wed, Apr 3, 2024 at 3:42 PM Kafka Life  wrote:

> Hi Kafka users
>
> Does any one have a document or ppt that showcases the capabilities of
> Kafka along with any cost management capability?
> i have a customer who is still using IBM MQM and rabbit MQ. I want the
> client to consider kafka for messaging and data streaming. I wanted to seek
> your expert help if you have any document or ppt i can propose it as an
> example. could you pls help.
>
> thanks and regards
> KrisG
>


[jira] [Created] (KAFKA-16470) kafka-dump-log --offsets-decoder should support new records

2024-04-04 Thread David Jacot (Jira)
David Jacot created KAFKA-16470:
---

 Summary: kafka-dump-log --offsets-decoder should support new 
records
 Key: KAFKA-16470
 URL: https://issues.apache.org/jira/browse/KAFKA-16470
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-04-04 Thread via GitHub


OmniaGM commented on PR #15335:
URL: https://github.com/apache/kafka/pull/15335#issuecomment-2036650723

   > This change LGTM! But I think we need to have tests for the scenario you 
described in JIRA, to make sure it won't happen again. Could you help add some 
of them? Maybe add in `ReplicaManagerTest`?
   
   I added a test that ensure that offline partition shouldn't create new 
partition when `ReplicaManager::getOrCreatePartition` is triggered. The system 
test in pr https://github.com/apache/kafka/pull/15409 should also cover the 
full flow for this fix 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-04-04 Thread via GitHub


OmniaGM commented on PR #15335:
URL: https://github.com/apache/kafka/pull/15335#issuecomment-2036695494

   Thanks, and sorry for the delay I was trying to find any test beside the 
system test to test the full scenario in this PR but I think proofing the main 
cause is enough for now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-04 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1551349514


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { log =>
+  val tp = log.topicPartition
+
+  log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  log.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLogs.put(tp, log)
+  log.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")
+}
+  }
+
+  private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: 
TopicsImage): Iterable[UnifiedLog] = {
+futureLogs.values.flatMap { log =>
+  val topicId = log.topicId.getOrElse {
+throw new RuntimeException(s"The log dir $log does not have a topic 
ID, " +
+  "which is not allowed when running in KRaft mode.")
+  }
+  val partitionId = log.topicPartition.partition()
+  Option(newTopicsImage.getPartition(topicId, partitionId))
+.filter(pr => 
directoryId(log.parentDir).contains(pr.directory(brokerId)))
+.map(_ => log)

Review Comment:
   Addressed in 430546fa5a78118497ada6373607f6a25c78a8e8. Please take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16359: Corrected manifest file for kafka-clients [kafka]

2024-04-04 Thread via GitHub


showuon merged PR #15532:
URL: https://github.com/apache/kafka/pull/15532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective

2024-04-04 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16359.
---
Fix Version/s: 3.8.0
   3.7.1
   Resolution: Fixed

> kafka-clients-3.7.0.jar published to Maven Central is defective
> ---
>
> Key: KAFKA-16359
> URL: https://issues.apache.org/jira/browse/KAFKA-16359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Jeremy Norris
>Assignee: Apoorv Mittal
>Priority: Critical
> Fix For: 3.8.0, 3.7.1
>
>
> The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is 
> defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} 
> element:
> {code}
> Manifest-Version: 1.0
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
> .5.jar slf4j-api-1.7.36.jar
> {code}
> This bogus {{Class-Path}} element leads to compiler warnings for projects 
> that utilize it as a dependency:
> {code}
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": 
> no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> {code}
> Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published 
> without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or 
> a new release should be published that corrects this defect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on code in PR #12174:
URL: https://github.com/apache/kafka/pull/12174#discussion_r1551371510


##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -197,12 +198,11 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 verifyNonDaemonThreadsStatus()
   }
 
-  @Disabled
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("kraft"))
   def testCleanShutdownWithKRaftControllerUnavailable(quorum: String): Unit = {

Review Comment:
   That is good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on PR #15639:
URL: https://github.com/apache/kafka/pull/15639#issuecomment-2036758667

   > have delayed backporting either change to 3.6 as we're currently in an 
ongoing release. If you'd like me to backport it now, I can do that.
   
   That makes sense. Let's wait for the RC2 get votes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16383: fix flaky IdentityReplicationIntegrationTest .testReplicateFromLatest [kafka]

2024-04-04 Thread via GitHub


vamossagar12 commented on PR #15556:
URL: https://github.com/apache/kafka/pull/15556#issuecomment-2036758332

   @johnnychhsu I am still not sure that this is the root cause. As I said 
above:
   
   > I ran the test locally and noticed the same plugin loading errors but the 
test did pass eventually. Moreover, all the other tests in 
IdentityReplicationIntegrationTest also throw the same connect plugin related 
errors but they pass on my local. You might want to debug further to find the 
actual reason for the flakiness.
   
   Maybe we can monitor a few more builds after the other PR gets merged to 
verify?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Test PR without the circular dependency code [kafka]

2024-04-04 Thread via GitHub


vamossagar12 opened a new pull request, #15653:
URL: https://github.com/apache/kafka/pull/15653

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]

2024-04-04 Thread via GitHub


vamossagar12 commented on code in PR #15642:
URL: https://github.com/apache/kafka/pull/15642#discussion_r1551420344


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java:
##
@@ -69,6 +69,9 @@ public static void setup() {
 Map workerProps = new HashMap<>();
 workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID);
 
+// Work around a circular-dependency in TestPlugins.
+TestPlugins.pluginPath();

Review Comment:
   Thanks for the explanation. hmm I ran the tests in 
`ConnectorValidationIntergationTest` without this one line but including all 
others and the tests are passing for me locally. I created a branch (draft, we 
can close it once we have validated the behaviour) 
https://github.com/apache/kafka/pull/15653 with the changes. We can monitor if 
the circular dependency related errors show up there or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-04 Thread via GitHub


vamossagar12 commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2036805236

   @showuon would you have time to review this? It's a smallish change. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-04 Thread via GitHub


dajac commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1551485397


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -251,6 +252,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"
   val ConsumerGroupMaxSizeProp = "group.consumer.max.size"
   val ConsumerGroupAssignorsProp = "group.consumer.assignors"
+  val GroupConsumerUpgradePolicyProp = "group.consumer.upgrade.policy"

Review Comment:
   I am still not fully satisfied with this property because 
`group.consumer.upgrade.policy = upgrade` reads weird. What could we use to 
replace `upgrade` here? `conversion`? `migration`?



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -251,6 +252,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"

Review Comment:
   nit: While we are here, could you add the missing space after `=`?



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -677,6 +679,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval 
for registered consumers."
   val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single 
consumer group can accommodate."
   val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full 
class names. The first one in the list is considered as the default assignor to 
be used in the case where the consumer does not specify an assignor."
+  val GroupConsumerUpgradePolicyDoc = "The config that enables the group 
protocol upgrade/downgrade. The valid values are " + 
Utils.join(Utils.enumOptions(classOf[GroupConsumerUpgradePolicy]), ",") + "."

Review Comment:
   It would be great if we could extend the documentation here a little bit. I 
think that we need to call out that this is about converting classic group 
using the consumer embedded protocol to the consumer group protocol and vice 
versa. We could also call out the various policies with a small descriptions.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConsumerUpgradePolicy.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupConsumerUpgradePolicy {

Review Comment:
   nit: `GroupConsumer` to `ConsumerGroup`.



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1700,6 +1704,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val consumerGroupMaxHeartbeatIntervalMs = 
getInt(KafkaConfig.ConsumerGroupMaxHeartbeatIntervalMsProp)
   val consumerGroupMaxSize = getInt(KafkaConfig.ConsumerGroupMaxSizeProp)
   val consumerGroupAssignors = 
getConfiguredInstances(KafkaConfig.ConsumerGroupAssignorsProp, 
classOf[PartitionAssignor])
+  val groupConsumerUpgradePolicy = 
GroupConsumerUpgradePolicy.parse(getString(KafkaConfig.GroupConsumerUpgradePolicyProp))

Review Comment:
   nit: `consumerGroup..`



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -251,6 +252,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"
   val ConsumerGroupMaxSizeProp = "group.consumer.max.size"
   val ConsumerGroupAssignorsProp = "group.consumer.assignors"
+  val GroupConsumerUpgradePolicyProp = "group.consumer.upgrade.policy"

Review Comment:
   nit: `ConsumerGroupUpgradePolicyProp`



##
server/src/main/java/org/apache/kafka/server/config/Defaults.java:
##
@@ -151,6 +152,7 @@ public class Defaults {
 UniformAssignor.class.getName(),
 RangeAssignor.class.getName()
 );
+public static final String GROUP_CONSUMER_UPGRADE_POLICY = 
GroupConsumerUpgradePolicy.DISABLED.toString();

Review Comment:
   nit: `CONSUMER_GROUP_`



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -677,6 +679,7 @@ object KafkaConf

Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-04 Thread via GitHub


nizhikov commented on PR #15645:
URL: https://github.com/apache/kafka/pull/15645#issuecomment-2037008993

   Hello @chia7712
   Looks like CI OK. Can you, please, take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] Add type check to classic group timeout operations [kafka]

2024-04-04 Thread via GitHub


dajac commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1551577818


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2814,7 +2820,9 @@ private CoordinatorResult 
tryCompleteInitialRebalanceElseSchedule(
 int delayMs,
 int remainingMs
 ) {
-if (group.newMemberAdded() && remainingMs != 0) {
+if (!containsClassicGroup(group.groupId())) {
+log.info("Group {} is null or not a classic group, skipping the 
initial rebalance stage.", group.groupId());
+} else if (group.newMemberAdded() && remainingMs != 0) {

Review Comment:
   Same comment.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2451,6 +2451,8 @@ private CoordinatorResult 
completeClassicGroupJoin(
 
 if (group.isInState(DEAD)) {
 log.info("Group {} is dead, skipping rebalance stage.", groupId);
+} else if (!containsClassicGroup(group.groupId())) {
+log.info("Group {} is null or not a classic group, skipping 
rebalance stage.", groupId);

Review Comment:
   I find this check a little unexpected here because the method received a 
`ClassicGroup`. It seems that `completeClassicGroupJoin` is called from two 
different contexts. In the first one, it is called in the regular processing of 
the request so the `ClassicGroup` is available and we know that it is fine. In 
the second one, it is called from an expired timer. In this case, we need to 
validate that the group still exists. Therefore, I wonder whether we should add 
overload to `completeClassicGroupJoin` which takes the group id and looks up 
the group based on it. We could then use it in the "timer context". What do you 
think?
   
   



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2963,7 +2971,9 @@ private CoordinatorResult expirePendingSync(
 ClassicGroup group,
 int generationId
 ) {
-if (generationId != group.generationId()) {
+if (!containsClassicGroup(group.groupId())) {

Review Comment:
   It looks like that we could change the param here.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2553,6 +2555,10 @@ private CoordinatorResult 
expireClassicGroupMemberHeartbeat(
 log.info("Received notification of heartbeat expiration for member 
{} after group {} " +
 "had already been unloaded or deleted.",
 memberId, group.groupId());
+} else if (!containsClassicGroup(group.groupId())) {
+log.info("Received notification of heartbeat expiration for member 
{} after group {} " +
+"had already been deleted or upgraded.",
+memberId, group.groupId());

Review Comment:
   I have a similar comment for this one. However, in this case, we could just 
replace the `group` argument by `groupId` and do the lookup here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on PR #15645:
URL: https://github.com/apache/kafka/pull/15645#issuecomment-2037116797

   > Looks like CI OK. Can you, please, take a look?
   
   thanks for updated PR. Sorry that I'm digging in the flaky tests in our CI, 
but I will take a look at this one ASAP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] debug for #15489 [kafka]

2024-04-04 Thread via GitHub


chia7712 opened a new pull request, #15654:
URL: https://github.com/apache/kafka/pull/15654

   we can't get correct offsets :(
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on code in PR #15645:
URL: https://github.com/apache/kafka/pull/15645#discussion_r1551656009


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.cluster.Broker;
+import kafka.cluster.EndPoint;
+import kafka.server.KafkaConfig;
+import kafka.server.QuorumTestHarness;
+import kafka.utils.TestInfoUtils;
+import kafka.zk.AdminZkClient;
+import kafka.zk.BrokerInfo;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.security.PasswordEncoder;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ZooKeeperInternals;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class ConfigCommandIntegrationTest extends QuorumTestHarness {
+/** @see TestInfoUtils#TestWithParameterizedQuorumName()  */
+public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME = 
"{displayName}.{argumentsWithNames}";

Review Comment:
   It seems we should use `junit-platform.properties` file instead of defining 
it for all test cases. We have addressed it for a part of module (see #14983). 
It would be nice we can add `junit-platform.properties` to all test module. 
WDYT? we can complete it in another PR before this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-04-04 Thread via GitHub


mumrah merged PR #14706:
URL: https://github.com/apache/kafka/pull/14706


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-04-04 Thread via GitHub


mumrah commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1551680241


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2185,9 +2185,157 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+}
+
+Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi(
+List topicNamesList,
+Map> topicFutures,
+Map nodes,
+DescribeTopicsOptions options,
+long now
+) {
+Map topicsRequests = new LinkedHashMap<>();
+topicNamesList.stream().sorted().forEach(topic -> {
+topicsRequests.put(topic, new TopicRequest().setName(topic));
+});
+return new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics = topicsRequests;

Review Comment:
   Why do we need this inner assignment? Can we make `topicsRequest` final in 
the outer scope?



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+return this;
+}
+
+// Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+// max.request.partition.size.limit on the server side.

Review Comment:
   @CalvinConfluent can you open a ticket for this so we don't forget about it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on code in PR #12174:
URL: https://github.com/apache/kafka/pull/12174#discussion_r1551687551


##
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala:
##
@@ -260,9 +274,12 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
   }
 
   def killBroker(index: Int): Unit = {

Review Comment:
   My point was "should we keep `_brokers(index).awaitShutdown()`"? It seems 
the latch used by `awaitShutdown` should be notified after `shutdown` get 
completed, so maybe it is fine to remove it. It would be great to add comments 
if you prefer to remove `awaitShutdown`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16471) SslTransportLayer may leak SSLEngine resources

2024-04-04 Thread Gaurav Narula (Jira)
Gaurav Narula created KAFKA-16471:
-

 Summary: SslTransportLayer may leak SSLEngine resources
 Key: KAFKA-16471
 URL: https://issues.apache.org/jira/browse/KAFKA-16471
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 3.6.1, 3.7.0
Reporter: Gaurav Narula


{{SslTransportLayer}} does not invoke {{SSLEngine::closeInbound}} in 
{{close()}} after flushing the {{close_notify}} TLS alert.

While this isn't a problem for the default JDK SSLEngine, it results in 
resource leak in Netty/OpenSSL based SSLEngine which frees native resources 
only when {{closeInbound}} is invoked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16471) SslTransportLayer may leak SSLEngine resources

2024-04-04 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula reassigned KAFKA-16471:
-

Assignee: Gaurav Narula

> SslTransportLayer may leak SSLEngine resources
> --
>
> Key: KAFKA-16471
> URL: https://issues.apache.org/jira/browse/KAFKA-16471
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Gaurav Narula
>Assignee: Gaurav Narula
>Priority: Major
>
> {{SslTransportLayer}} does not invoke {{SSLEngine::closeInbound}} in 
> {{close()}} after flushing the {{close_notify}} TLS alert.
> While this isn't a problem for the default JDK SSLEngine, it results in 
> resource leak in Netty/OpenSSL based SSLEngine which frees native resources 
> only when {{closeInbound}} is invoked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-04 Thread via GitHub


OmniaGM opened a new pull request, #15656:
URL: https://github.com/apache/kafka/pull/15656

   part 3 for https://github.com/apache/kafka/pull/15501
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16471: invoke SSLEngine::closeInbound on SslTransportLayer close [kafka]

2024-04-04 Thread via GitHub


gaurav-narula opened a new pull request, #15655:
URL: https://github.com/apache/kafka/pull/15655

   Invokes `SSLEngine::closeInbound` after we flush close_notify alert to the 
socket. This fixes memory leak in Netty/OpenSSL based SSLEngine which only free 
native resources once closeInbound has been invoked.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16471: invoke SSLEngine::closeInbound on SslTransportLayer close [kafka]

2024-04-04 Thread via GitHub


gaurav-narula commented on PR #15655:
URL: https://github.com/apache/kafka/pull/15655#issuecomment-2037308254

   CC: @rajinisivaram @ijuma @harshach 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16471) SslTransportLayer may leak SSLEngine resources

2024-04-04 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula updated KAFKA-16471:
--
Fix Version/s: 3.8.0
   3.7.1

> SslTransportLayer may leak SSLEngine resources
> --
>
> Key: KAFKA-16471
> URL: https://issues.apache.org/jira/browse/KAFKA-16471
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Gaurav Narula
>Assignee: Gaurav Narula
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> {{SslTransportLayer}} does not invoke {{SSLEngine::closeInbound}} in 
> {{close()}} after flushing the {{close_notify}} TLS alert.
> While this isn't a problem for the default JDK SSLEngine, it results in 
> resource leak in Netty/OpenSSL based SSLEngine which frees native resources 
> only when {{closeInbound}} is invoked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-04 Thread via GitHub


OmniaGM commented on PR #15656:
URL: https://github.com/apache/kafka/pull/15656#issuecomment-2037325362

   @nizhikov can you have a look when you have time please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16471: invoke SSLEngine::closeInbound on SslTransportLayer close [kafka]

2024-04-04 Thread via GitHub


OmniaGM commented on code in PR #15655:
URL: https://github.com/apache/kafka/pull/15655#discussion_r1551787559


##
clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java:
##
@@ -199,6 +199,14 @@ public void close() throws IOException {
 } catch (IOException ie) {
 log.debug("Failed to send SSL Close message", ie);
 } finally {
+try {
+sslEngine.closeInbound();
+} catch (SSLException e) {
+// This is a debug log because an exception may be raised here 
frequently due to peers which do not
+// follow the TLS spec and fail to send a close_notify alert. 
Even if they do, we currently don't read
+// data from the socket after close() is invoked.

Review Comment:
   ```suggestion
   // This log is for debugging purposes as an exception might 
occur frequently 
   // at this point due to peers not following the TLS specs 
and failing to send a close_notify alert. 
   // Even if they do, currently, we do not read data from the 
socket after invoking close().
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16471: invoke SSLEngine::closeInbound on SslTransportLayer close [kafka]

2024-04-04 Thread via GitHub


OmniaGM commented on code in PR #15655:
URL: https://github.com/apache/kafka/pull/15655#discussion_r1551790292


##
clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java:
##
@@ -1528,4 +1530,30 @@ public void 
testHandshakeUnwrapContinuesUnwrappingOnNeedUnwrapAfterAllBytesRead(
 assertEquals(SSLEngineResult.Status.OK, result.getStatus());
 assertEquals(SSLEngineResult.HandshakeStatus.NEED_WRAP, 
result.getHandshakeStatus());
 }
+
+@Test
+public void testSSLEngineCloseInboundInvokedOnClose() throws IOException {
+// Given
+SSLEngine sslEngine = mock(SSLEngine.class);
+Socket socket = mock(Socket.class);
+SocketChannel socketChannel = mock(SocketChannel.class);
+SelectionKey selectionKey = mock(SelectionKey.class);
+when(socketChannel.socket()).thenReturn(socket);
+when(selectionKey.channel()).thenReturn(socketChannel);
+SslTransportLayer sslTransportLayer = new SslTransportLayer(
+"test-channel",
+selectionKey,
+sslEngine,
+mock(ChannelMetadataRegistry.class)
+);
+

Review Comment:
   Could you please remove any unnecessary extra lines?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16471: invoke SSLEngine::closeInbound on SslTransportLayer close [kafka]

2024-04-04 Thread via GitHub


OmniaGM commented on code in PR #15655:
URL: https://github.com/apache/kafka/pull/15655#discussion_r1551791995


##
clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java:
##
@@ -204,7 +204,9 @@ public void close() throws IOException {
 } catch (SSLException e) {
 // This is a debug log because an exception may be raised here 
frequently due to peers which do not
 // follow the TLS spec and fail to send a close_notify alert. 
Even if they do, we currently don't read
-// data from the socket after close() is invoked.
+// This log is for debugging purposes as an exception might 
occur frequently 

Review Comment:
   The suggestion got applied wrongly can you fix it please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16471: invoke SSLEngine::closeInbound on SslTransportLayer close [kafka]

2024-04-04 Thread via GitHub


OmniaGM commented on code in PR #15655:
URL: https://github.com/apache/kafka/pull/15655#discussion_r1551791995


##
clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java:
##
@@ -204,7 +204,9 @@ public void close() throws IOException {
 } catch (SSLException e) {
 // This is a debug log because an exception may be raised here 
frequently due to peers which do not
 // follow the TLS spec and fail to send a close_notify alert. 
Even if they do, we currently don't read
-// data from the socket after close() is invoked.
+// This log is for debugging purposes as an exception might 
occur frequently 

Review Comment:
   The suggestion got applied wrongly can you fix it please? the first 2 lines 
still exist 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16471: invoke SSLEngine::closeInbound on SslTransportLayer close [kafka]

2024-04-04 Thread via GitHub


gaurav-narula commented on code in PR #15655:
URL: https://github.com/apache/kafka/pull/15655#discussion_r1551794309


##
clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java:
##
@@ -1528,4 +1530,30 @@ public void 
testHandshakeUnwrapContinuesUnwrappingOnNeedUnwrapAfterAllBytesRead(
 assertEquals(SSLEngineResult.Status.OK, result.getStatus());
 assertEquals(SSLEngineResult.HandshakeStatus.NEED_WRAP, 
result.getHandshakeStatus());
 }
+
+@Test
+public void testSSLEngineCloseInboundInvokedOnClose() throws IOException {
+// Given
+SSLEngine sslEngine = mock(SSLEngine.class);
+Socket socket = mock(Socket.class);
+SocketChannel socketChannel = mock(SocketChannel.class);
+SelectionKey selectionKey = mock(SelectionKey.class);
+when(socketChannel.socket()).thenReturn(socket);
+when(selectionKey.channel()).thenReturn(socketChannel);
+SslTransportLayer sslTransportLayer = new SslTransportLayer(
+"test-channel",
+selectionKey,
+sslEngine,
+mock(ChannelMetadataRegistry.class)
+);
+

Review Comment:
   Force pushed the fixes for both the comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16471: invoke SSLEngine::closeInbound on SslTransportLayer close [kafka]

2024-04-04 Thread via GitHub


OmniaGM commented on PR #15655:
URL: https://github.com/apache/kafka/pull/15655#issuecomment-2037372617

   Nice catch @gaurav-narula! the fix seems straightforward! LGTM assuming the 
pipeline will pass 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16470; kafka-dump-log --offsets-decoder should support new records [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on code in PR #15652:
URL: https://github.com/apache/kafka/pull/15652#discussion_r1551795883


##
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##
@@ -398,9 +399,34 @@ object DumpLogSegments {
 }
   }
 
-  private class OffsetsMessageParser extends MessageParser[String, String] {
+  // Package private for testing.
+  class OffsetsMessageParser extends MessageParser[String, String] {
+private val serde = new RecordSerde()
+
 override def parse(record: Record): (Option[String], Option[String]) = {
-  GroupMetadataManager.formatRecordKeyAndValue(record)
+  if (!record.hasKey)
+throw new RuntimeException(s"Failed to decode message at offset 
${record.offset} using offset " +
+  "topic decoder (message had a missing key)")
+
+  try {
+val r = serde.deserialize(record.key, record.value)
+(
+  Some(r.key.message.toString),
+  Option(r.value).map(_.message.toString).orElse(Some(""))

Review Comment:
   It seems the `r.value` is never null ( 
https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java#L101
 )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16470; kafka-dump-log --offsets-decoder should support new records [kafka]

2024-04-04 Thread via GitHub


dajac commented on code in PR #15652:
URL: https://github.com/apache/kafka/pull/15652#discussion_r1551827679


##
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##
@@ -398,9 +399,34 @@ object DumpLogSegments {
 }
   }
 
-  private class OffsetsMessageParser extends MessageParser[String, String] {
+  // Package private for testing.
+  class OffsetsMessageParser extends MessageParser[String, String] {
+private val serde = new RecordSerde()
+
 override def parse(record: Record): (Option[String], Option[String]) = {
-  GroupMetadataManager.formatRecordKeyAndValue(record)
+  if (!record.hasKey)
+throw new RuntimeException(s"Failed to decode message at offset 
${record.offset} using offset " +
+  "topic decoder (message had a missing key)")
+
+  try {
+val r = serde.deserialize(record.key, record.value)
+(
+  Some(r.key.message.toString),
+  Option(r.value).map(_.message.toString).orElse(Some(""))

Review Comment:
   It could be null. See here: 
https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java#L92.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16470; kafka-dump-log --offsets-decoder should support new records [kafka]

2024-04-04 Thread via GitHub


dajac commented on PR #15652:
URL: https://github.com/apache/kafka/pull/15652#issuecomment-2037428994

   > Also, it seems the following classes are useless.
   
   Yeah, I noticed those classes too. I am actually not sure what we kept them. 
It may be for backward compatibility. I need to dig a little more on it. It is 
anyway not related to this patch so we can treat it separately.
   
   > Not sure whether this will hurt the users who are using this tool to 
observe the assignments. Maybe we can parse subscription and assignment 
according to official kafka consumer, and convert the parse exception 
(https://issues.apache.org/jira/browse/KAFKA-15603) to log message instead of 
throwing exception. WDYT?
   
   I don't think users rely on this tool to read the assignments. At least, I 
never did. The annoying part is that we cannot reuse the auto-generated 
structures if we want to preserve this. We need to introduce a new structure 
which is basically the autogenerated one + new fields for holding the 
deserialised arrays. I am not fan of this because they will for sure diverge in 
the future. I will think a little more about it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-04 Thread via GitHub


dongnuo123 commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1551865511


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -251,6 +252,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"
   val ConsumerGroupMaxSizeProp = "group.consumer.max.size"
   val ConsumerGroupAssignorsProp = "group.consumer.assignors"
+  val GroupConsumerUpgradePolicyProp = "group.consumer.upgrade.policy"

Review Comment:
   Let me change it to `consumer.group.migration.policy`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16472) Integration tests in Java don't really run kraft case

2024-04-04 Thread PoAn Yang (Jira)
PoAn Yang created KAFKA-16472:
-

 Summary: Integration tests in Java don't really run kraft case
 Key: KAFKA-16472
 URL: https://issues.apache.org/jira/browse/KAFKA-16472
 Project: Kafka
  Issue Type: Test
Reporter: PoAn Yang
Assignee: PoAn Yang


Following test cases don't really run kraft case. The reason is that the test 
info doesn't contain parameter name, so it always returns false in 
TestInfoUtils#isKRaft.
 * TopicCommandIntegrationTest
 * DeleteConsumerGroupsTest
 * AuthorizerIntegrationTest
 * DeleteOffsetsConsumerGroupCommandIntegrationTest

 

We can add `options.compilerArgs += '-parameters'` after 
[https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/build.gradle#L264-L273]
 to fix it.

 

Also, we have to add `String quorum` to cases in 
DeleteOffsetsConsumerGroupCommandIntegrationTest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16472) Integration tests in Java don't really run kraft case

2024-04-04 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833985#comment-17833985
 ] 

Chia-Ping Tsai commented on KAFKA-16472:


[~brandboat] Could you share the junit 5 details we discussed offline?

> Integration tests in Java don't really run kraft case
> -
>
> Key: KAFKA-16472
> URL: https://issues.apache.org/jira/browse/KAFKA-16472
> Project: Kafka
>  Issue Type: Test
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>
> Following test cases don't really run kraft case. The reason is that the test 
> info doesn't contain parameter name, so it always returns false in 
> TestInfoUtils#isKRaft.
>  * TopicCommandIntegrationTest
>  * DeleteConsumerGroupsTest
>  * AuthorizerIntegrationTest
>  * DeleteOffsetsConsumerGroupCommandIntegrationTest
>  
> We can add `options.compilerArgs += '-parameters'` after 
> [https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/build.gradle#L264-L273]
>  to fix it.
>  
> Also, we have to add `String quorum` to cases in 
> DeleteOffsetsConsumerGroupCommandIntegrationTest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16473) KafkaDockerWrapper uses wrong cluster ID when formatting log dir

2024-04-04 Thread Sebastian Marsching (Jira)
Sebastian Marsching created KAFKA-16473:
---

 Summary: KafkaDockerWrapper uses wrong cluster ID when formatting 
log dir
 Key: KAFKA-16473
 URL: https://issues.apache.org/jira/browse/KAFKA-16473
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Sebastian Marsching


There is a bug in {{{}KafkaDockerWrapper{}}}, that causes {{Some()}} to be used when formatting the log dir when 
Kafka is started for the first time inside a Docker container.

More specifically, the problem is in {{{}formatStorageCmd{}}}: The code uses 
{{{}env.get("CLUSTER_ID"){}}}, but this returns an {{Option}} not a 
{{{}String{}}}.

The code should instead check whether the environment variable is set, raising 
an exception if it is not set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16472) Integration tests in Java don't really run kraft case

2024-04-04 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833988#comment-17833988
 ] 

Kuan Po Tseng commented on KAFKA-16472:
---

Looks like 
[https://github.com/junit-team/junit5/blob/4c0dddad1b96d4a20e92a2cd583954643ac56ac0/junit-jupiter-params/src/main/java/org/junit/jupiter/params/ParameterizedTestNameFormatter.java#L93]
in this line, the scala tests can get the correct parameter name (i.e. quorum) 
back while java tests could only get Optional.empty. As PoAn pointed out, add 
compilerArgs {{-parameters}} can solve the missing parameter name in java tests.

> Integration tests in Java don't really run kraft case
> -
>
> Key: KAFKA-16472
> URL: https://issues.apache.org/jira/browse/KAFKA-16472
> Project: Kafka
>  Issue Type: Test
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>
> Following test cases don't really run kraft case. The reason is that the test 
> info doesn't contain parameter name, so it always returns false in 
> TestInfoUtils#isKRaft.
>  * TopicCommandIntegrationTest
>  * DeleteConsumerGroupsTest
>  * AuthorizerIntegrationTest
>  * DeleteOffsetsConsumerGroupCommandIntegrationTest
>  
> We can add `options.compilerArgs += '-parameters'` after 
> [https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/build.gradle#L264-L273]
>  to fix it.
>  
> Also, we have to add `String quorum` to cases in 
> DeleteOffsetsConsumerGroupCommandIntegrationTest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16472) Integration tests in Java don't really run kraft case

2024-04-04 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833990#comment-17833990
 ] 

Kuan Po Tseng commented on KAFKA-16472:
---

And the javadoc in junit5 also mentions that 
https://github.com/junit-team/junit5/blob/4c0dddad1b96d4a20e92a2cd583954643ac56ac0/junit-jupiter-params/src/main/java/org/junit/jupiter/params/ParameterizedTest.java#L163

> Integration tests in Java don't really run kraft case
> -
>
> Key: KAFKA-16472
> URL: https://issues.apache.org/jira/browse/KAFKA-16472
> Project: Kafka
>  Issue Type: Test
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>
> Following test cases don't really run kraft case. The reason is that the test 
> info doesn't contain parameter name, so it always returns false in 
> TestInfoUtils#isKRaft.
>  * TopicCommandIntegrationTest
>  * DeleteConsumerGroupsTest
>  * AuthorizerIntegrationTest
>  * DeleteOffsetsConsumerGroupCommandIntegrationTest
>  
> We can add `options.compilerArgs += '-parameters'` after 
> [https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/build.gradle#L264-L273]
>  to fix it.
>  
> Also, we have to add `String quorum` to cases in 
> DeleteOffsetsConsumerGroupCommandIntegrationTest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16474) AsyncKafkaConsumer might rapidly send out successive heartbeat causing partitions getting revoked

2024-04-04 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16474:
--

 Summary: AsyncKafkaConsumer might rapidly send out successive 
heartbeat causing partitions getting revoked
 Key: KAFKA-16474
 URL: https://issues.apache.org/jira/browse/KAFKA-16474
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


KAFKA-16389

We've discovered that in some uncommon cases, the consumer could send out 
successive heartbeats without waiting for the response to come back.  this 
might result in causing the consumer to revoke its just assigned assignments in 
some cases.  For example:

 

The consumer first sends out a heartbeat with epoch=0 and memberId='' 

The consumer then rapidly sends out another heartbeat with epoch=0 and 
memberId='' because it has not gotten any response and thus not updating its 
local state

 

The consumer receives assignments from the first heartbeat and reconciles its 
assignment.

 

Since the second heartbeat has epoch=0 and memberId='', the server will think 
this is a new member joining and therefore send out an empty assignment.  

 

The consumer receives the response from the second heartbeat.  Revoke all of 
its partitions.

 

There are 2 issues associate with this bug:
 # inflight logic
 # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to be 
a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2037601997

   merge trunk to trigger QA again. Also, the error seems happen due to 
unchanged leader. will check it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might rapidly send out successive heartbeat without waiting for the response the come back

2024-04-04 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16474:
---
Summary: AsyncKafkaConsumer might rapidly send out successive heartbeat 
without waiting for the response the come back  (was: AsyncKafkaConsumer might 
rapidly send out successive heartbeat causing partitions getting revoked)

> AsyncKafkaConsumer might rapidly send out successive heartbeat without 
> waiting for the response the come back
> -
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-04-04 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16474:
---
Summary: AsyncKafkaConsumer might send out heartbeat request without 
waiting for its response  (was: AsyncKafkaConsumer might rapidly send out 
successive heartbeat without waiting for the response the come back)

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16473) KafkaDockerWrapper uses wrong cluster ID when formatting log dir

2024-04-04 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833997#comment-17833997
 ] 

Kuan Po Tseng commented on KAFKA-16473:
---

Thank you for the detailed description. Are you planning to address this issue? 
If not, I'd be happy to take care of it.

> KafkaDockerWrapper uses wrong cluster ID when formatting log dir
> 
>
> Key: KAFKA-16473
> URL: https://issues.apache.org/jira/browse/KAFKA-16473
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Sebastian Marsching
>Priority: Major
>
> There is a bug in {{{}KafkaDockerWrapper{}}}, that causes {{Some( CLUSTER_ID environment variable>)}} to be used when formatting the log dir 
> when Kafka is started for the first time inside a Docker container.
> More specifically, the problem is in {{{}formatStorageCmd{}}}: The code uses 
> {{{}env.get("CLUSTER_ID"){}}}, but this returns an {{Option}} not a 
> {{{}String{}}}.
> The code should instead check whether the environment variable is set, 
> raising an exception if it is not set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16473) KafkaDockerWrapper uses wrong cluster ID when formatting log dir

2024-04-04 Thread Sebastian Marsching (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833999#comment-17833999
 ] 

Sebastian Marsching commented on KAFKA-16473:
-

I am currently working on a PR that should address this issue. I would 
appreciate a review of that PR, though.

> KafkaDockerWrapper uses wrong cluster ID when formatting log dir
> 
>
> Key: KAFKA-16473
> URL: https://issues.apache.org/jira/browse/KAFKA-16473
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Sebastian Marsching
>Priority: Major
>
> There is a bug in {{{}KafkaDockerWrapper{}}}, that causes {{Some( CLUSTER_ID environment variable>)}} to be used when formatting the log dir 
> when Kafka is started for the first time inside a Docker container.
> More specifically, the problem is in {{{}formatStorageCmd{}}}: The code uses 
> {{{}env.get("CLUSTER_ID"){}}}, but this returns an {{Option}} not a 
> {{{}String{}}}.
> The code should instead check whether the environment variable is set, 
> raising an exception if it is not set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16473) KafkaDockerWrapper uses wrong cluster ID when formatting log dir

2024-04-04 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834001#comment-17834001
 ] 

Kuan Po Tseng commented on KAFKA-16473:
---

> I would appreciate a review of that PR, though.

Sure, I'll do that. Thanks for your input!

> KafkaDockerWrapper uses wrong cluster ID when formatting log dir
> 
>
> Key: KAFKA-16473
> URL: https://issues.apache.org/jira/browse/KAFKA-16473
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Sebastian Marsching
>Priority: Major
>
> There is a bug in {{{}KafkaDockerWrapper{}}}, that causes {{Some( CLUSTER_ID environment variable>)}} to be used when formatting the log dir 
> when Kafka is started for the first time inside a Docker container.
> More specifically, the problem is in {{{}formatStorageCmd{}}}: The code uses 
> {{{}env.get("CLUSTER_ID"){}}}, but this returns an {{Option}} not a 
> {{{}String{}}}.
> The code should instead check whether the environment variable is set, 
> raising an exception if it is not set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1552022713


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -189,14 +220,49 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   }
 
   private def verifyListOffsets(topic: String = topicName, 
expectedMaxTimestampOffset: Int = 1): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
-assertEquals(0, earliestOffset.offset())
+def check(): Unit = {
+  val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
+  assertEquals(0, earliestOffset.offset())
 
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic)
-assertEquals(3, latestOffset.offset())
+  val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), 
topic)
+  assertEquals(3, latestOffset.offset())
+
+  val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
+  assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+}
 
-val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
-assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+// case 0: test the offsets from leader's append path
+check()
+
+// case 1: test the offsets from follower's append path.
+// we make a follower be the new leader to handle the ListOffsetRequest
+val partitionAssignment = 
adminClient.describeTopics(java.util.Collections.singletonList(topic))
+  .allTopicNames().get().get(topic).partitions().get(0)
+val newLeader = brokers.map(_.config.brokerId).find(_ != 
partitionAssignment.leader().id()).get
+
adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new 
TopicPartition(topic, 0),
+  Optional.of(new 
NewPartitionReassignment(java.util.Arrays.asList(newLeader).all().get()
+waitForAllReassignmentsToComplete(adminClient)
+TestUtils.waitUntilTrue(() => newLeader == 
adminClient.describeTopics(java.util.Collections.singletonList(topic))
+  .allTopicNames().get().get(topic).partitions().get(0).leader().id(), 
"expected leader: " + newLeader
+  + ", but actual leader: " + 
adminClient.describeTopics(java.util.Collections.singletonList(topic))
+  .allTopicNames().get().get(topic).partitions().get(0).leader().id())

Review Comment:
   This is a bit hard to read now. Could we do `adminClient.describeTopics` 
once and reuse the result?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]

2024-04-04 Thread via GitHub


Joker-5 commented on code in PR #15642:
URL: https://github.com/apache/kafka/pull/15642#discussion_r1552049826


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java:
##
@@ -69,6 +69,9 @@ public static void setup() {
 Map workerProps = new HashMap<>();
 workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID);
 
+// Work around a circular-dependency in TestPlugins.
+TestPlugins.pluginPath();

Review Comment:
   > Thanks for the explanation. hmm I ran the tests in 
`ConnectorValidationIntergationTest` without this one line but including all 
others and the tests are passing for me locally. I created a branch (draft, we 
can close it once we have validated the behaviour) #15653 with the changes. We 
can monitor if the circular dependency related errors show up there or not.
   
   @vamossagar12, I checked the result from #15653 which did not have related 
errors, so I removed this one line.
   Maybe there's some problems in my local env, I will try to investigate the 
root cause :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]

2024-04-04 Thread via GitHub


Joker-5 commented on code in PR #15642:
URL: https://github.com/apache/kafka/pull/15642#discussion_r1552049826


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java:
##
@@ -69,6 +69,9 @@ public static void setup() {
 Map workerProps = new HashMap<>();
 workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID);
 
+// Work around a circular-dependency in TestPlugins.
+TestPlugins.pluginPath();

Review Comment:
   > Thanks for the explanation. hmm I ran the tests in 
`ConnectorValidationIntergationTest` without this one line but including all 
others and the tests are passing for me locally. I created a branch (draft, we 
can close it once we have validated the behaviour) #15653 with the changes. We 
can monitor if the circular dependency related errors show up there or not.
   
   @vamossagar12, I checked the result from #15653 which did not have related 
errors, so I removed this one line.
   Maybe there's some problem in my local env, I will try to investigate the 
root cause :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] Add type check to classic group timeout operations [kafka]

2024-04-04 Thread via GitHub


dongnuo123 commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1552050811


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2451,6 +2451,8 @@ private CoordinatorResult 
completeClassicGroupJoin(
 
 if (group.isInState(DEAD)) {
 log.info("Group {} is dead, skipping rebalance stage.", groupId);
+} else if (!containsClassicGroup(group.groupId())) {
+log.info("Group {} is null or not a classic group, skipping 
rebalance stage.", groupId);

Review Comment:
   Yeah I agree with it. I'll try to change the params and overload them if 
they are also called in the regular context



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [No Review] Kafka-14563 client side. [kafka]

2024-04-04 Thread via GitHub


CalvinConfluent opened a new pull request, #15657:
URL: https://github.com/apache/kafka/pull/15657

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16473: Use correct cluster ID when formatting log dir. [kafka]

2024-04-04 Thread via GitHub


smarsching opened a new pull request, #15658:
URL: https://github.com/apache/kafka/pull/15658

   This fixes an issue that when starting a Docker container for the first 
time, the cluster ID used when formatting the log dir would not be 
`$CLUSTER_ID` but `Some($CLUSTER_ID)` (KAFKA-16473).
   
   In order to be able to test the `formatStorageCmd` method which contained 
the bug, the method has been made package private.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2037697246

   > 
(kafka.admin.ListOffsetsIntegrationTest.testThreeCompressedRecordsInSeparateBatch(String).quorum=kraft)
 failed with the following.
   
   I'm trying to reproduce it on my local :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16473) KafkaDockerWrapper uses wrong cluster ID when formatting log dir

2024-04-04 Thread Sebastian Marsching (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834021#comment-17834021
 ] 

Sebastian Marsching commented on KAFKA-16473:
-

[~brandboat] The PR is now available at 
[https://github.com/apache/kafka/pull/15658].

> KafkaDockerWrapper uses wrong cluster ID when formatting log dir
> 
>
> Key: KAFKA-16473
> URL: https://issues.apache.org/jira/browse/KAFKA-16473
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Sebastian Marsching
>Priority: Major
>
> There is a bug in {{{}KafkaDockerWrapper{}}}, that causes {{Some( CLUSTER_ID environment variable>)}} to be used when formatting the log dir 
> when Kafka is started for the first time inside a Docker container.
> More specifically, the problem is in {{{}formatStorageCmd{}}}: The code uses 
> {{{}env.get("CLUSTER_ID"){}}}, but this returns an {{Option}} not a 
> {{{}String{}}}.
> The code should instead check whether the environment variable is set, 
> raising an exception if it is not set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]

2024-04-04 Thread via GitHub


brandboat opened a new pull request, #15659:
URL: https://github.com/apache/kafka/pull/15659

   related to KAFKA-16455
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16471: invoke SSLEngine::closeInbound on SslTransportLayer close [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on code in PR #15655:
URL: https://github.com/apache/kafka/pull/15655#discussion_r1552096474


##
clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java:
##
@@ -1528,4 +1530,29 @@ public void 
testHandshakeUnwrapContinuesUnwrappingOnNeedUnwrapAfterAllBytesRead(
 assertEquals(SSLEngineResult.Status.OK, result.getStatus());
 assertEquals(SSLEngineResult.HandshakeStatus.NEED_WRAP, 
result.getHandshakeStatus());
 }
+
+@Test
+public void testSSLEngineCloseInboundInvokedOnClose() throws IOException {
+// Given
+SSLEngine sslEngine = mock(SSLEngine.class);

Review Comment:
   Should we mock `closeInbound` to throw `SSLException` to test that we do 
catch the exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16473: Use correct cluster ID when formatting log dir. [kafka]

2024-04-04 Thread via GitHub


brandboat commented on code in PR #15658:
URL: https://github.com/apache/kafka/pull/15658#discussion_r1552104493


##
core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala:
##
@@ -87,8 +87,12 @@ object KafkaDockerWrapper {
 parser.parseArgsOrFail(args)
   }
 
-  private def formatStorageCmd(configsPath: Path, env: Map[String, String]): 
Array[String] = {
-Array("format", "--cluster-id=" + env.get("CLUSTER_ID"), "-c", 
s"${configsPath.toString}/server.properties")
+  private[docker] def formatStorageCmd(configsPath: Path, env: Map[String, 
String]): Array[String] = {
+val clusterId = env.get("CLUSTER_ID") match {
+  case Some(str) => str
+  case None => throw new RuntimeException("CLUSTER_ID environment variable 
is not set.")

Review Comment:
   Could you add a test for this in `testFormatStorageCmd` ? `assertThrows...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16471: invoke SSLEngine::closeInbound on SslTransportLayer close [kafka]

2024-04-04 Thread via GitHub


gaurav-narula commented on code in PR #15655:
URL: https://github.com/apache/kafka/pull/15655#discussion_r1552109605


##
clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java:
##
@@ -1528,4 +1530,29 @@ public void 
testHandshakeUnwrapContinuesUnwrappingOnNeedUnwrapAfterAllBytesRead(
 assertEquals(SSLEngineResult.Status.OK, result.getStatus());
 assertEquals(SSLEngineResult.HandshakeStatus.NEED_WRAP, 
result.getHandshakeStatus());
 }
+
+@Test
+public void testSSLEngineCloseInboundInvokedOnClose() throws IOException {
+// Given
+SSLEngine sslEngine = mock(SSLEngine.class);

Review Comment:
   Thanks for the feedback! Addressed in 
3eb2e99d91d51b313b5b224c9aa7dced1097fd30



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: disable internal result emit throttling in TTD [kafka]

2024-04-04 Thread via GitHub


mjsax opened a new pull request, #15660:
URL: https://github.com/apache/kafka/pull/15660

   Kafka Streams DSL operators use internal wall-clock based throttling 
parameters for performance reasons. These configs make the usage of TTD 
difficult: users need to advance the mocked wall-clock time in their test code, 
or set these internal configs to zero.
   
   To simplify testing, TDD should disable both configs automatically.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16475) Create unit test for TopicImageNode

2024-04-04 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-16475:


 Summary: Create unit test for TopicImageNode
 Key: KAFKA-16475
 URL: https://issues.apache.org/jira/browse/KAFKA-16475
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Add type check to classic group timeout operations [kafka]

2024-04-04 Thread via GitHub


dongnuo123 commented on PR #15587:
URL: https://github.com/apache/kafka/pull/15587#issuecomment-2037838999

   Need to add unit tests for the change when the group upgrade/downgrade 
conversion are finished.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-04 Thread via GitHub


dongnuo123 commented on PR #15411:
URL: https://github.com/apache/kafka/pull/15411#issuecomment-2037835825

   Will need to add unit tests for the change when group upgrade/downgrade 
conversion is finished.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16473: Use correct cluster ID when formatting log dir. [kafka]

2024-04-04 Thread via GitHub


smarsching commented on code in PR #15658:
URL: https://github.com/apache/kafka/pull/15658#discussion_r1552171112


##
core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala:
##
@@ -87,8 +87,12 @@ object KafkaDockerWrapper {
 parser.parseArgsOrFail(args)
   }
 
-  private def formatStorageCmd(configsPath: Path, env: Map[String, String]): 
Array[String] = {
-Array("format", "--cluster-id=" + env.get("CLUSTER_ID"), "-c", 
s"${configsPath.toString}/server.properties")
+  private[docker] def formatStorageCmd(configsPath: Path, env: Map[String, 
String]): Array[String] = {
+val clusterId = env.get("CLUSTER_ID") match {
+  case Some(str) => str
+  case None => throw new RuntimeException("CLUSTER_ID environment variable 
is not set.")

Review Comment:
   Sure, I just force-pushed a commit that adds this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-04-04 Thread via GitHub


junrao commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1552178574


##
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##
@@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest {
  */
 public static final long EARLIEST_LOCAL_TIMESTAMP = -4L;
 
+public static final long LATEST_TIERED_TIMESTAMP = -5L;

Review Comment:
   Typically, if we add a new targetTimestamp value, we will need to bump up 
the version of the ListOffsetsRequest. See 
https://github.com/apache/kafka/pull/10760/files. Otherwise, a client could be 
setting LATEST_TIERED_TIMESTAMP and assuming that the server supports it, but 
the server actually does not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] kip 966 unclean leader election dynamic configs [kafka]

2024-04-04 Thread via GitHub


cmccabe commented on PR #15603:
URL: https://github.com/apache/kafka/pull/15603#issuecomment-2037928952

   I think the very first thing to do is to implement an enum for the new 
"recovery strategy" thing, and replace all the cases where we're using a 
boolean with this enum. So the boolean needs to go away. You also need to 
handle overrides (what happens if both strategy and 
unclean.leader.election.enable are set?) I think strategy takes priority.
   
   So a simple thing would be to just implement the "never" and "always" 
strategies for now, and not allow the middle setting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add GetReplicaLogInfo API server side support [kafka]

2024-04-04 Thread via GitHub


cmccabe commented on PR #15604:
URL: https://github.com/apache/kafka/pull/15604#issuecomment-2037940819

   This is not bad but does the RPC show up on the server yet? We can't add it 
until it's implemented.
   
   I think you should set `latestVersionUnstable: "true"` and then maybe we 
could make progress by committing this ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1552264764


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -483,12 +484,13 @@ public int recover(ProducerStateManager 
producerStateManager, Optional maxTimestampSoFar()) {
 maxTimestampAndOffsetSoFar = new 
TimestampOffset(batch.maxTimestamp(), batch.lastOffset());
+System.out.println("[CHIA] recovery: " + 
maxTimestampAndOffsetSoFar);

Review Comment:
   Is this just for testing?



##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -189,14 +215,52 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   }
 
   private def verifyListOffsets(topic: String = topicName, 
expectedMaxTimestampOffset: Int = 1): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
-assertEquals(0, earliestOffset.offset())
+def check(): Unit = {
+  val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
+  assertEquals(0, earliestOffset.offset())
 
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic)
-assertEquals(3, latestOffset.offset())
+  val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), 
topic)
+  assertEquals(3, latestOffset.offset())
+
+  val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
+  assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+}
 
-val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
-assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+// case 0: test the offsets from leader's append path
+check()
+
+// case 1: test the offsets from follower's append path.
+// we make a follower be the new leader to handle the ListOffsetRequest
+def leader(): Int = 
adminClient.describeTopics(java.util.Collections.singletonList(topic))
+  .allTopicNames().get().get(topic).partitions().get(0).leader().id()
+
+val previousLeader = leader()
+val newLeader = brokers.map(_.config.brokerId).find(_ != 
previousLeader).get
+
+// change the leader to new one
+
adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new 
TopicPartition(topic, 0),
+  Optional.of(new 
NewPartitionReassignment(java.util.Arrays.asList(newLeader).all().get()
+// wait for all reassignments get completed
+waitForAllReassignmentsToComplete(adminClient)
+// make sure we are able to see the new leader
+TestUtils.waitUntilTrue(() => newLeader == leader(), s"expected leader: 
$newLeader but actual: ${leader()}")

Review Comment:
   The leader could change in the error message by calling leader() again. 
Could we save the last leader and use that in the error message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2038064405

   @junrao thanks for reviews. I have removed the useless log and revise the 
test. let us see what happens. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1552358023


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -189,14 +215,56 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   }
 
   private def verifyListOffsets(topic: String = topicName, 
expectedMaxTimestampOffset: Int = 1): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
-assertEquals(0, earliestOffset.offset())
+def check(): Unit = {
+  val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
+  assertEquals(0, earliestOffset.offset())
 
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic)
-assertEquals(3, latestOffset.offset())
+  val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), 
topic)
+  assertEquals(3, latestOffset.offset())
+
+  val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
+  assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+}
 
-val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
-assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+// case 0: test the offsets from leader's append path
+check()
+
+// case 1: test the offsets from follower's append path.
+// we make a follower be the new leader to handle the ListOffsetRequest
+def leader(): Int = 
adminClient.describeTopics(java.util.Collections.singletonList(topic))
+  .allTopicNames().get().get(topic).partitions().get(0).leader().id()
+
+val previousLeader = leader()
+val newLeader = brokers.map(_.config.brokerId).find(_ != 
previousLeader).get
+
+// change the leader to new one
+
adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new 
TopicPartition(topic, 0),
+  Optional.of(new 
NewPartitionReassignment(java.util.Arrays.asList(newLeader).all().get()
+// wait for all reassignments get completed
+waitForAllReassignmentsToComplete(adminClient)
+// make sure we are able to see the new leader
+var lastLeader = leader()

Review Comment:
   It seems that we could just initialize with -1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-04-04 Thread via GitHub


cadonna commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1552378325


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -309,49 +300,49 @@ public void 
shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce
 
 @Test
 public void shouldNotAttemptToLockIfNoStores() {
-stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-EasyMock.replay(stateDirectory);
+stateDirectory = mock(StateDirectory.class);
 
 task = createStatelessTask(createConfig("100"));
 
 task.initializeIfNeeded();
 
 // should fail if lock is called
-EasyMock.verify(stateDirectory);
+verify(stateDirectory, never()).lock(any());
 }
 
 @Test
 public void 
shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws 
IOException {
-final IMocksControl ctrl = EasyMock.createStrictControl();
-final ProcessorStateManager stateManager = 
ctrl.createMock(ProcessorStateManager.class);
-
EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
-stateDirectory = ctrl.createMock(StateDirectory.class);
+final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
+when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+stateDirectory = mock(StateDirectory.class);
 
-stateManager.registerGlobalStateStores(emptyList());
-EasyMock.expectLastCall();
+doNothing().when(stateManager).registerGlobalStateStores(emptyList());

Review Comment:
   Do we really need this stub?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -309,49 +300,49 @@ public void 
shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce
 
 @Test
 public void shouldNotAttemptToLockIfNoStores() {
-stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-EasyMock.replay(stateDirectory);
+stateDirectory = mock(StateDirectory.class);
 
 task = createStatelessTask(createConfig("100"));
 
 task.initializeIfNeeded();
 
 // should fail if lock is called
-EasyMock.verify(stateDirectory);
+verify(stateDirectory, never()).lock(any());
 }
 
 @Test
 public void 
shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws 
IOException {
-final IMocksControl ctrl = EasyMock.createStrictControl();
-final ProcessorStateManager stateManager = 
ctrl.createMock(ProcessorStateManager.class);
-
EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
-stateDirectory = ctrl.createMock(StateDirectory.class);
+final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
+when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+stateDirectory = mock(StateDirectory.class);
 
-stateManager.registerGlobalStateStores(emptyList());
-EasyMock.expectLastCall();
+doNothing().when(stateManager).registerGlobalStateStores(emptyList());
 
-EasyMock.expect(stateManager.taskId()).andReturn(taskId);
+when(stateManager.taskId()).thenReturn(taskId);
 
-EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 
-stateManager.close();
-EasyMock.expectLastCall();
+doNothing().when(stateManager).close();

Review Comment:
   Do we really need this stub?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -309,49 +300,49 @@ public void 
shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce
 
 @Test
 public void shouldNotAttemptToLockIfNoStores() {
-stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-EasyMock.replay(stateDirectory);
+stateDirectory = mock(StateDirectory.class);
 
 task = createStatelessTask(createConfig("100"));
 
 task.initializeIfNeeded();
 
 // should fail if lock is called
-EasyMock.verify(stateDirectory);
+verify(stateDirectory, never()).lock(any());
 }
 
 @Test
 public void 
shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws 
IOException {
-final IMocksControl ctrl = EasyMock.createStrictControl();
-final ProcessorStateManager stateManager = 
ctrl.createMock(ProcessorStateManager.class);
-
EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
-stateDirectory = ctrl.createMock(StateDirectory.class);
+final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
+when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+stateDirectory = mock(StateDirectory.class);
 
-stateManager.regi

[PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]

2024-04-04 Thread via GitHub


philipnee opened a new pull request, #15661:
URL: https://github.com/apache/kafka/pull/15661

   The current AssignmentValidationTest only tests EAGER assignment protocol 
and does not support incremental assignment like CooperativeStickyAssignor and 
consumer protocol.  Therefore in the ConsumerEventHandler, I subclassed the 
existing handler overridden the assigned and revoke event handling methods, to 
permit incremental changes to the current assignments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]

2024-04-04 Thread via GitHub


philipnee commented on PR #15661:
URL: https://github.com/apache/kafka/pull/15661#issuecomment-2038186018

   @kirktrue - This inherits from the patch you attached to the jira ticket.  
Would you have time to review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-04 Thread via GitHub


dongnuo123 opened a new pull request, #15662:
URL: https://github.com/apache/kafka/pull/15662

   Based on https://github.com/apache/kafka/pull/15411, the pr contains 
triggering of group conversion from a classic group to a consumer group.
   
   In consumerGroupHeartbeat, the online migration will be triggered if a 
joining request (the request member epoch is 0) to an non-empty classic group 
is received. The converting method will create a new consumer group according 
to the existing classic group. The new consumer group's reference will be used 
in the following member joining process but will not be persisted to the 
timeline data structure until the records are replayed.
   
   A special case to consider is when the replay of the conversion and joining 
records fail. A possible solution is discussed in 
https://github.com/apache/kafka/pull/15587.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16460) New consumer times out consuming records in consumer_test.py system test

2024-04-04 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834082#comment-17834082
 ] 

Kirk True commented on KAFKA-16460:
---

Good or bad, the test actually passes a good fraction of the time.

> New consumer times out consuming records in consumer_test.py system test
> 
>
> Key: KAFKA-16460
> URL: https://issues.apache.org/jira/browse/KAFKA-16460
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with the following errors:
> {quote}
> * Timed out waiting for consumption
> {quote}
> Affected tests:
> * {{test_broker_failure}}
> * {{test_consumer_bounce}}
> * {{test_static_consumer_bounce}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]

2024-04-04 Thread via GitHub


philipnee commented on PR #15661:
URL: https://github.com/apache/kafka/pull/15661#issuecomment-2038263452

   Thanks for reviewing this.  This is the test results:
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.4
   session_id:   2024-04-04--035
   run time: 9 minutes 46.055 seconds
   tests run:14
   passed:   14
   flaky:0
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   43.462 seconds
   

   test_id:
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
   status: PASS
   run time:   42.289 seconds
   

   test_id:
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   45.432 seconds
   

   test_id:
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RangeAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   37.334 seconds
   

   test_id:
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RangeAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
   status: PASS
   run time:   36.570 seconds
   

   test_id:
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RangeAssignor.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   39.302 seconds
   

   test_id:
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   36.570 seconds
   

   test_id:
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
   status: PASS
   run time:   36.298 seconds
   

   test_id:
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   39.076 seconds
   

   test_id:
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.StickyAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   35.942 seconds
   

   test_id:
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.StickyAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
   status: PASS
   run time:   36.115 seconds
   

   test_id:
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.StickyAssignor.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   39.770 seconds
   
-

Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]

2024-04-04 Thread via GitHub


philipnee commented on PR #15661:
URL: https://github.com/apache/kafka/pull/15661#issuecomment-2038266652

   Can we change the message to : `err_msg="expecting valid assignments of %d 
partitions for node %d but got: %s" % \`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8735: Check properties file existence first [kafka]

2024-04-04 Thread via GitHub


thaumatoast commented on PR #7139:
URL: https://github.com/apache/kafka/pull/7139#issuecomment-2038272787

   I'm currently facing this issue (and not mixing versions). This appears to 
be a bug regardless of version mixing, is there a reason not to merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225 Flaky test suite LogDirFailureTest#testIOExceptionDuringLogRoll [kafka]

2024-04-04 Thread via GitHub


gaurav-narula commented on PR #15637:
URL: https://github.com/apache/kafka/pull/15637#issuecomment-2038293124

   I think this might be due to KAFKA-16234 which has a PR at 
https://github.com/apache/kafka/pull/15335


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16472) Integration tests in Java don't really run kraft case

2024-04-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834106#comment-17834106
 ] 

Luke Chen commented on KAFKA-16472:
---

[~yangpoan] , great find! But how could you confirm only these 4 test suites 
are impacted, not others?

> Integration tests in Java don't really run kraft case
> -
>
> Key: KAFKA-16472
> URL: https://issues.apache.org/jira/browse/KAFKA-16472
> Project: Kafka
>  Issue Type: Test
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>
> Following test cases don't really run kraft case. The reason is that the test 
> info doesn't contain parameter name, so it always returns false in 
> TestInfoUtils#isKRaft.
>  * TopicCommandIntegrationTest
>  * DeleteConsumerGroupsTest
>  * AuthorizerIntegrationTest
>  * DeleteOffsetsConsumerGroupCommandIntegrationTest
>  
> We can add `options.compilerArgs += '-parameters'` after 
> [https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/build.gradle#L264-L273]
>  to fix it.
>  
> Also, we have to add `String quorum` to cases in 
> DeleteOffsetsConsumerGroupCommandIntegrationTest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-04-04 Thread via GitHub


showuon commented on PR #15335:
URL: https://github.com/apache/kafka/pull/15335#issuecomment-2038496756

   @OmniaGM , there is compilation error in jdk8_scala2.12 job. Could you have 
a look?
   ```
   [2024-04-04T09:19:51.266Z] [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15335/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:4952:125:
 recursive value leaderTopicsDelta needs type
   [2024-04-04T09:19:51.266Z] [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15335/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:4953:17:
 value makeLeader is not a member of Any
   [2024-04-04T09:19:51.266Z] [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15335/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:4957:7:
 overloaded method value assertTrue with alternatives:
   [2024-04-04T09:19:51.266Z]   (x$1: java.util.function.BooleanSupplier)Unit 

   [2024-04-04T09:19:51.266Z]   (x$1: Boolean)Unit
   [2024-04-04T09:19:51.266Z]  cannot be applied to (Any)
   ```
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15335/13/pipeline


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]

2024-04-04 Thread via GitHub


FrankYang0529 opened a new pull request, #15663:
URL: https://github.com/apache/kafka/pull/15663

   Following test cases don't really run kraft case. The reason is that the 
test info doesn't contain parameter name, so it always returns false in 
TestInfoUtils#isKRaft.
   
   - TopicCommandIntegrationTest
   - DeleteConsumerGroupsTest
   - AuthorizerIntegrationTest
   - DeleteOffsetsConsumerGroupCommandIntegrationTest
   
   We can add `options.compilerArgs << '-parameters'` after 
https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/build.gradle#L264-L273
 to fix it.
   
   Also, we have to add `String quorum` to cases in 
DeleteOffsetsConsumerGroupCommandIntegrationTest.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16262) Add IQv2 to Kafka Streams documentation

2024-04-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834123#comment-17834123
 ] 

Matthias J. Sax commented on KAFKA-16262:
-

Of course you can work on this ticket.

Did you already read [https://kafka.apache.org/contributing] as a starting 
point?

For this ticket, you would need to do a PR on Github. The docs are in the Kafka 
repo: [https://github.com/apache/kafka/tree/trunk/docs]

Are you familiar with IQv2 and IQv1 and can extend the docs to explain IQv2?

> Add IQv2 to Kafka Streams documentation
> ---
>
> Key: KAFKA-16262
> URL: https://issues.apache.org/jira/browse/KAFKA-16262
> Project: Kafka
>  Issue Type: Task
>  Components: docs, streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, newbie
>
> The new IQv2 API was added many release ago. While it is still not feature 
> complete, we should add it to the docs 
> ([https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html])
>  to make users aware of the new API so they can start to try it out, report 
> issue and provide feedback / feature requests.
> We might still state that IQv2 is not yet feature complete, but should change 
> the docs in a way to position is as the "new API", and have code exmples.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-04-04 Thread HiroArai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834129#comment-17834129
 ] 

HiroArai commented on KAFKA-16310:
--

[~chia7712] [~omkreddy] 

>  [~ijuma] Are you worry about the 3.6.2? If so, the fixes are reverted 
>already. see

[https://home.apache.org/~manikumar/kafka-3.6.2-rc2/RELEASE_NOTES.html]

This is my first time commenting on kafka's jira. I apologize if this is an 
unnecessary question.

The fix for this bug is not mentioned in the release notes of 3.6.2-rc2.

Will this bug not be fixed in 3.6.2 ?

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.8.0
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Increase parallelism for Jenkins [kafka]

2024-04-04 Thread via GitHub


github-actions[bot] commented on PR #15099:
URL: https://github.com/apache/kafka/pull/15099#issuecomment-2038787254

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15708: KRaft support in FetchRequestDownConversionConfigTest [kafka]

2024-04-04 Thread via GitHub


github-actions[bot] commented on PR #14715:
URL: https://github.com/apache/kafka/pull/14715#issuecomment-2038787793

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]

2024-04-04 Thread via GitHub


github-actions[bot] commented on PR #14666:
URL: https://github.com/apache/kafka/pull/14666#issuecomment-2038787933

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16472) Integration tests in Java don't really run kraft case

2024-04-04 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834130#comment-17834130
 ] 

PoAn Yang commented on KAFKA-16472:
---

{quote}could you confirm only these 4 test suites are impacted, not others?
{quote}
Using `@ValueSource(strings = \{"zk", "kraft"})` for global search, 
ReassignPartitionsIntegrationTest and DescribeConsumerGroupTest are also 
impacted. For some other test cases like TieredStorageTestHarness, it uses 
different template `\{displayName}.quorum=\{0}`, so it's not impacted.

> Integration tests in Java don't really run kraft case
> -
>
> Key: KAFKA-16472
> URL: https://issues.apache.org/jira/browse/KAFKA-16472
> Project: Kafka
>  Issue Type: Test
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>
> Following test cases don't really run kraft case. The reason is that the test 
> info doesn't contain parameter name, so it always returns false in 
> TestInfoUtils#isKRaft.
>  * TopicCommandIntegrationTest
>  * DeleteConsumerGroupsTest
>  * AuthorizerIntegrationTest
>  * DeleteOffsetsConsumerGroupCommandIntegrationTest
>  
> We can add `options.compilerArgs += '-parameters'` after 
> [https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/build.gradle#L264-L273]
>  to fix it.
>  
> Also, we have to add `String quorum` to cases in 
> DeleteOffsetsConsumerGroupCommandIntegrationTest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]

2024-04-04 Thread via GitHub


showuon commented on PR #15663:
URL: https://github.com/apache/kafka/pull/15663#issuecomment-2038807461

   @FrankYang0529 , thanks for the fix! Nice catch!
   Questions: 
   1. How could we confirm the `String quorum` missing only in 
`DeleteOffsetsConsumerGroupCommandIntegrationTest`, not other places?
   2. How could we avoid this things happen in the future? Like adding some 
checking before the tests startup or something? Do you have any idea? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225 Flaky test suite LogDirFailureTest#testIOExceptionDuringLogRoll [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on PR #15637:
URL: https://github.com/apache/kafka/pull/15637#issuecomment-2038837427

   close as duplicate


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225 Flaky test suite LogDirFailureTest#testIOExceptionDuringLogRoll [kafka]

2024-04-04 Thread via GitHub


chia7712 closed pull request #15637: KAFKA-16225 Flaky test suite 
LogDirFailureTest#testIOExceptionDuringLogRoll
URL: https://github.com/apache/kafka/pull/15637


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-04-04 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834131#comment-17834131
 ] 

Chia-Ping Tsai commented on KAFKA-16310:


{quote}
Will this bug not be fixed in 3.6.2 ?
{quote}

yep. As we discussed above, all paths do NOT use same way to find the offset of 
max timestamp, so it is not a regression of both 3.6 and 3.7. Instead, it is a 
issue which is existent for a while :) 

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.8.0
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >