Re: [PR] Kafka-15444: Native docker image for Apache Kafka(KIP-974) [kafka]

2024-05-21 Thread via GitHub


omkreddy merged PR #15927:
URL: https://github.com/apache/kafka/pull/15927


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Aesthetic, Uniformity Changes and Reducing warnings [kafka]

2024-05-21 Thread via GitHub


rreddy-22 opened a new pull request, #16026:
URL: https://github.com/apache/kafka/pull/16026

   Went through all the files in the group coordinator module and fixed up a 
few minor warnings and cleaned up a few leftover errors. Also fixed the 
formatting to make it more uniform
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1609228927


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
+  // Add future replica log to partition's map if it's not existed

Review Comment:
   > However, adding alter thread 
(replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset))
 is not in this check. Is it possible that alter thread, which is invoked by 
another thread, just remove the future log and then this thread add the topic 
partition to replicaAlterLogDirsManager? It seems to me that alter thread will 
get fail as future log of partition is gone.
   
   That's possible. But I think that's fine because the removal of future log 
could because:
   1. alter logDir completes. In this case, the new leaderAndIsr request or 
topic partition update will updated and this fetcher will be removed then in 
`ReplicaManager#makeLeader or makeFollower`.
   2. Another log failure happened. In this case the createLogIfInexsted will 
fail, too.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2123824613

   @junrao @showuon 
   Thanks for the review! Addressed all the review comments. PTAL. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609227442


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
-emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-  else {
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) {
+// We need to be careful before reading the segment as 
`maxOffsetMetadata` may not be a complete metadata:
+// 1. If maxOffsetMetadata is message-offset-only, then return empty 
fetchDataInfo since
+// maxOffsetMetadata.offset is not on local log segments.
+// 2. If maxOffsetMetadata.segmentBaseOffset is smaller than 
segment.baseOffset, then return empty fetchDataInfo.

Review Comment:
   ack, kept the same behavior. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609227180


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) {

Review Comment:
   Addressed with the latest commit 77f90f63408ebe5b3da5c3305ad2affb7901eff4



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16662) UnwritableMetadataException: Metadata has been lost

2024-05-21 Thread Jianbin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848436#comment-17848436
 ] 

Jianbin Chen commented on KAFKA-16662:
--

After I deleted all of the __cluster_metadata-0, the problem did not occur when 
I started the cluster, but all my topic information was lost. Fortunately, this 
is just an offline test environment cluster. According to the phenomenon, it is 
certain that the incompatibility between the 3.5 version of metadata and the 
3.7 version caused this problem. This makes me dare not try to smoothly upgrade 
the cluster. In the past, when using zk, upgrading the broker would never cause 
similar problems!

> UnwritableMetadataException: Metadata has been lost
> ---
>
> Key: KAFKA-16662
> URL: https://issues.apache.org/jira/browse/KAFKA-16662
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
> Environment: Docker Image (bitnami/kafka:3.7.0)
> via Docker Compose
>Reporter: Tobias Bohn
>Priority: Major
> Attachments: log.txt
>
>
> Hello,
> First of all: I am new to this Jira and apologize if anything is set or 
> specified incorrectly. Feel free to advise me.
> We currently have an error in our test system, which unfortunately I can't 
> solve, because I couldn't find anything related to it. No solution could be 
> found via the mailing list either.
> The error occurs when we want to start up a node. The node runs using Kraft 
> and is both a controller and a broker. The following error message appears at 
> startup:
> {code:java}
> kafka  | [2024-04-16 06:18:13,707] ERROR Encountered fatal fault: Unhandled 
> error initializing new publishers 
> (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
> kafka  | org.apache.kafka.image.writer.UnwritableMetadataException: Metadata 
> has been lost because the following could not be represented in metadata 
> version 3.5-IV2: the directory assignment state of one or more replicas
> kafka  |        at 
> org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
> kafka  |        at 
> org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
> kafka  |        at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
> kafka  |        at 
> org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
> kafka  |        at 
> org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
> kafka  |        at 
> org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
> kafka  |        at 
> org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
> kafka  |        at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
> kafka  |        at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
> kafka  |        at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
> kafka  |        at java.base/java.lang.Thread.run(Thread.java:840)
> kafka exited with code 0 {code}
> We use Docker to operate the cluster. The error occurred while we were trying 
> to restart a node. All other nodes in the cluster are still running correctly.
> If you need further information, please let us know. The complete log is 
> attached to this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16783: Migrate RemoteLogMetadataManagerTest to new test infra [kafka]

2024-05-21 Thread via GitHub


chia7712 commented on PR #15983:
URL: https://github.com/apache/kafka/pull/15983#issuecomment-2123808122

   > I think we can file another PR to improve it, either we add a 
future.get(30 sec), or we set a global @timeout tag on the test class to limit 
the waiting time.
   
   I love the idea about global timeout. I will file a jira for it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16662) UnwritableMetadataException: Metadata has been lost

2024-05-21 Thread Jianbin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848434#comment-17848434
 ] 

Jianbin Chen commented on KAFKA-16662:
--

When I executed ./bin/kafka-features.sh --bootstrap-server 10.58.16.231:9092 
upgrade --metadata 3.7,  It continuously outputs the following exception
{panel:title=我的标题}
[2024-05-22 11:30:36,491] INFO [UnifiedLog partition=remote-test-5, 
dir=/data01/kafka-logs-351] Incremented log start offset to 26267689 due to 
leader offset increment (kafka.log.UnifiedLog)
[2024-05-22 11:30:36,497] INFO [UnifiedLog partition=remote-test2-0, 
dir=/data01/kafka-logs-351] Incremented log start offset to 3099360 due to 
leader offset increment (kafka.log.UnifiedLog)
[2024-05-22 11:30:37,149] ERROR Failed to propagate directory assignments 
because the Controller returned error STALE_BROKER_EPOCH 
(org.apache.kafka.server.AssignmentsManager)
[2024-05-22 11:30:38,064] ERROR Failed to propagate directory assignments 
because the Controller returned error STALE_BROKER_EPOCH 
(org.apache.kafka.server.AssignmentsManager)
[2024-05-22 11:30:39,376] ERROR Failed to propagate directory assignments 
because the Controller returned error STALE_BROKER_EPOCH 
(org.apache.kafka.server.AssignmentsManager)
[2024-05-22 11:30:41,486] ERROR Failed to propagate directory assignments 
because the Controller returned error STALE_BROKER_EPOCH 
(org.apache.kafka.server.AssignmentsManager)
[2024-05-22 11:30:43,794] INFO [BrokerLifecycleManager id=3] Unable to register 
broker 3 because the controller returned error INVALID_REGISTRATION 
(kafka.server.BrokerLifecycleManager)
[2024-05-22 11:30:45,224] ERROR Failed to propagate directory assignments 
because the Controller returned error STALE_BROKER_EPOCH 
(org.apache.kafka.server.AssignmentsManager)
{panel}
controller logs:
{code:java}
java.util.concurrent.CompletionException: 
org.apache.kafka.common.errors.StaleBrokerEpochException: Expected broker epoch 
41885255, but got broker epoch -1
    at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
    at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
    at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636)
    at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
    at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:880)
    at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:148)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:137)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
    at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.kafka.common.errors.StaleBrokerEpochException: Expected 
broker epoch 41885255, but got broker epoch -1{code}

> UnwritableMetadataException: Metadata has been lost
> ---
>
> Key: KAFKA-16662
> URL: https://issues.apache.org/jira/browse/KAFKA-16662
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
> Environment: Docker Image (bitnami/kafka:3.7.0)
> via Docker Compose
>Reporter: Tobias Bohn
>Priority: Major
> Attachments: log.txt
>
>
> Hello,
> First of all: I am new to this Jira and apologize if anything is set or 
> specified incorrectly. Feel free to advise me.
> We currently have an error in our test system, which unfortunately I can't 
> solve, because I couldn't find anything related to it. No solution could be 
> found via the mailing list either.
> The error occurs when we want to start up a node. The node runs using Kraft 
> and is both a controller and a broker. The following error message appears at 
> startup:
> {code:java}
> kafka  | [2024-04-16 06:18:13,707] ERROR Encountered fatal fault: Unhandled 
> error initializing new publishers 
> (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
> kafka  | org.apache.kafka.image.writer.UnwritableMetadataException: Metadata 
> has been lost because the following could not be represented in metadata 
> version 3.5-IV2: the directory assignment state of one or more replicas
> kafka  |        at 
> org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
> kafka  |        at 
> 

[jira] [Resolved] (KAFKA-16783) Migrate RemoteLogMetadataManagerTest to new test infra

2024-05-21 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16783.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Migrate RemoteLogMetadataManagerTest to new test infra
> --
>
> Key: KAFKA-16783
> URL: https://issues.apache.org/jira/browse/KAFKA-16783
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: storage_test
> Fix For: 3.8.0
>
>
> as title
> `TopicBasedRemoteLogMetadataManagerWrapperWithHarness` could be replaced by 
> `RemoteLogMetadataManagerTestUtils#builder`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16783: Migrate RemoteLogMetadataManagerTest to new test infra [kafka]

2024-05-21 Thread via GitHub


showuon merged PR #15983:
URL: https://github.com/apache/kafka/pull/15983


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16662) UnwritableMetadataException: Metadata has been lost

2024-05-21 Thread Jianbin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848431#comment-17848431
 ] 

Jianbin Chen commented on KAFKA-16662:
--

Could someone please pay attention to this issue and help me out?
{code:java}
[admin@kafka-dev-d-010058016231 kafka]$ ./bin/kafka-features.sh 
--bootstrap-server 10.58.16.231:9092 describe
Feature: metadata.version    SupportedMinVersion: 3.0-IV1    
SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.5-IV2    Epoch: 
41885646{code}

> UnwritableMetadataException: Metadata has been lost
> ---
>
> Key: KAFKA-16662
> URL: https://issues.apache.org/jira/browse/KAFKA-16662
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
> Environment: Docker Image (bitnami/kafka:3.7.0)
> via Docker Compose
>Reporter: Tobias Bohn
>Priority: Major
> Attachments: log.txt
>
>
> Hello,
> First of all: I am new to this Jira and apologize if anything is set or 
> specified incorrectly. Feel free to advise me.
> We currently have an error in our test system, which unfortunately I can't 
> solve, because I couldn't find anything related to it. No solution could be 
> found via the mailing list either.
> The error occurs when we want to start up a node. The node runs using Kraft 
> and is both a controller and a broker. The following error message appears at 
> startup:
> {code:java}
> kafka  | [2024-04-16 06:18:13,707] ERROR Encountered fatal fault: Unhandled 
> error initializing new publishers 
> (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
> kafka  | org.apache.kafka.image.writer.UnwritableMetadataException: Metadata 
> has been lost because the following could not be represented in metadata 
> version 3.5-IV2: the directory assignment state of one or more replicas
> kafka  |        at 
> org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
> kafka  |        at 
> org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
> kafka  |        at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
> kafka  |        at 
> org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
> kafka  |        at 
> org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
> kafka  |        at 
> org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
> kafka  |        at 
> org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
> kafka  |        at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
> kafka  |        at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
> kafka  |        at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
> kafka  |        at java.base/java.lang.Thread.run(Thread.java:840)
> kafka exited with code 0 {code}
> We use Docker to operate the cluster. The error occurred while we were trying 
> to restart a node. All other nodes in the cluster are still running correctly.
> If you need further information, please let us know. The complete log is 
> attached to this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-21 Thread via GitHub


artemlivshits commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1609158284


##
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##
@@ -75,16 +75,19 @@ object BrokerFeatures extends Logging {
   }
 
   def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {
-Features.supportedFeatures(
-  java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
+val features = new util.HashMap[String, SupportedVersionRange]()
+  features.put(MetadataVersion.FEATURE_NAME,
 new SupportedVersionRange(
   MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
   if (unstableMetadataVersionsEnabled) {
 MetadataVersion.latestTesting.featureLevel
   } else {
 MetadataVersion.latestProduction.featureLevel
-  }
-)))
+  }))
+org.apache.kafka.server.common.Features.PRODUCTION_FEATURES.forEach { 
feature =>

Review Comment:
   Any reasons this is a fully qualified class name and not just imported as 
it's usually done?



##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -16,72 +16,135 @@
  */
 package org.apache.kafka.server.common;
 
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.stream.Collectors;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Features {
+
+/**
+ * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
+ */
+TEST_VERSION("test.feature.version", TestFeatureVersion.values(), 
TestFeatureVersion::fromFeatureLevel, false);
 
-public final class Features {
-private final MetadataVersion version;
-private final Map finalizedFeatures;
-private final long finalizedFeaturesEpoch;
+public static final Features[] FEATURES;
+public static final List PRODUCTION_FEATURES;
+private final String name;
+private final FeatureVersion[] features;
+private final CreateMethod createFeatureVersionMethod;
+private final boolean usedInProduction;
 
-public static Features fromKRaftVersion(MetadataVersion version) {
-return new Features(version, Collections.emptyMap(), -1, true);
+Features(String name,
+ FeatureVersion[] features,
+ CreateMethod createMethod,
+ boolean usedInProduction) {
+this.name = name;
+this.features = features;
+this.createFeatureVersionMethod = createMethod;
+this.usedInProduction = usedInProduction;
 }
 
-public Features(
-MetadataVersion version,
-Map finalizedFeatures,
-long finalizedFeaturesEpoch,
-boolean kraftMode
-) {
-this.version = version;
-this.finalizedFeatures = new HashMap<>(finalizedFeatures);
-this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
-// In KRaft mode, we always include the metadata version in the 
features map.
-// In ZK mode, we never include it.
-if (kraftMode) {
-this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
-} else {
-this.finalizedFeatures.remove(FEATURE_NAME);
-}
+static {
+Features[] enumValues = Features.values();
+FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+feature.usedInProduction).collect(Collectors.toList());
 }
 
-public MetadataVersion metadataVersion() {
-return version;
+public String featureName() {
+return name;
 }
 
-public Map finalizedFeatures() {
-return finalizedFeatures;
+public FeatureVersion[] features() {
+return features;
 }
 
-public long finalizedFeaturesEpoch() {
-return finalizedFeaturesEpoch;
+/**
+ * Creates a FeatureVersion 

Re: [PR] KAFKA-16783: Migrate RemoteLogMetadataManagerTest to new test infra [kafka]

2024-05-21 Thread via GitHub


showuon commented on PR #15983:
URL: https://github.com/apache/kafka/pull/15983#issuecomment-2123794220

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16662) UnwritableMetadataException: Metadata has been lost

2024-05-21 Thread Jianbin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848424#comment-17848424
 ] 

Jianbin Chen edited comment on KAFKA-16662 at 5/22/24 3:10 AM:
---

I have encountered the same issue. Can anyone help me with this? I upgraded 
from 3.5.1 to 3.7.0, and I have already changed inter.broker.protocol.version 
to 3.7 and ran it for some time.

But I have never executed

 
{code:java}
./bin/kafka-features.sh upgrade --metadata 3.7 {code}
The last time I restarted the cluster, I found that it could not be started 
anymore. The last line of the log is as follows:

 
{code:java}
[2024-05-22 11:01:41,087] INFO [MetadataLoader id=3] 
maybePublishMetadata(LOG_DELTA): The loader is still catching up because we 
have loaded up to offset 41872530, but the high water mark is 41872532 
(org.apache.kafka.image.loader.MetadataLoader)
[2024-05-22 11:01:41,088] INFO [MetadataLoader id=3] 
maybePublishMetadata(LOG_DELTA): The loader finished catching up to the current 
high water mark of 41872532 (org.apache.kafka.image.loader.MetadataLoader)
[2024-05-22 11:01:41,092] INFO [BrokerLifecycleManager id=3] The broker has 
caught up. Transitioning from STARTING to RECOVERY. 
(kafka.server.BrokerLifecycleManager)
[2024-05-22 11:01:41,092] INFO [BrokerServer id=3] Finished waiting for the 
controller to acknowledge that we are caught up (kafka.server.BrokerServer)
[2024-05-22 11:01:41,092] INFO [BrokerServer id=3] Waiting for the initial 
broker metadata update to be published (kafka.server.BrokerServer)
[2024-05-22 11:01:41,095] ERROR Encountered fatal fault: Unhandled error 
initializing new publishers 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been 
lost because the following could not be represented in metadata version 
3.5-IV2: the directory assignment state of one or more replicas
    at 
org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
    at 
org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
    at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
    at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
    at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
    at 
org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
    at 
org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
    at java.base/java.lang.Thread.run(Thread.java:1583) {code}
 

 

 


was (Author: jianbin):
I have encountered the same issue. Can anyone help me with this? I upgraded 
from 3.5.1 to 3.7.0, and I have already changed inter.broker.protocol.version 
to 3.7 and ran it for some time.

But I have never executed

`./bin/kafka-features.sh upgrade --metadata 3.7`

The last time I restarted the cluster, I found that it could not be started 
anymore. The last line of the log is as follows:

```

[2024-05-22 11:01:41,087] INFO [MetadataLoader id=3] 
maybePublishMetadata(LOG_DELTA): The loader is still catching up because we 
have loaded up to offset 41872530, but the high water mark is 41872532 
(org.apache.kafka.image.loader.MetadataLoader)
[2024-05-22 11:01:41,088] INFO [MetadataLoader id=3] 
maybePublishMetadata(LOG_DELTA): The loader finished catching up to the current 
high water mark of 41872532 (org.apache.kafka.image.loader.MetadataLoader)
[2024-05-22 11:01:41,092] INFO [BrokerLifecycleManager id=3] The broker has 
caught up. Transitioning from STARTING to RECOVERY. 
(kafka.server.BrokerLifecycleManager)
[2024-05-22 11:01:41,092] INFO [BrokerServer id=3] Finished waiting for the 
controller to acknowledge that we are caught up (kafka.server.BrokerServer)
[2024-05-22 11:01:41,092] INFO [BrokerServer id=3] Waiting for the initial 
broker metadata update to be published (kafka.server.BrokerServer)
[2024-05-22 11:01:41,095] ERROR Encountered fatal fault: Unhandled error 
initializing new publishers 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been 
lost because the following could not be represented in metadata version 
3.5-IV2: the directory assignment state of one or more replicas
    at 
org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
    at 
org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
    at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
    at 

Re: [PR] KAFKA-16771 First log directory printed twice when formatting storage [kafka]

2024-05-21 Thread via GitHub


gongxuanzhang commented on code in PR #16010:
URL: https://github.com/apache/kafka/pull/16010#discussion_r1609198047


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -452,19 +452,20 @@ object StorageTool extends Logging {
   stream.println("All of the log directories are already formatted.")
 } else {
   metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => {
-copier.setLogDirProps(logDir, new 
MetaProperties.Builder(metaProperties).
-  setDirectoryId(copier.generateValidDirectoryId()).
+val loggingCopier = new 
MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble)
+loggingCopier.setLogDirProps(logDir, new 
MetaProperties.Builder(metaProperties).
+  setDirectoryId(loggingCopier.generateValidDirectoryId()).
   build())
-copier.setPreWriteHandler((logDir, _, _) => {
+loggingCopier.setPreWriteHandler((logDir, _, _) => {
   stream.println(s"Formatting $logDir with metadata.version 
$metadataVersion.")
   Files.createDirectories(Paths.get(logDir))
   val bootstrapDirectory = new BootstrapDirectory(logDir, 
Optional.empty())
   bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
 })
-copier.setWriteErrorHandler((logDir, e) => {
+loggingCopier.setWriteErrorHandler((logDir, e) => {
   throw new TerseFailure(s"Error while writing meta.properties file 
$logDir: ${e.getMessage}")
 })
-copier.writeLogDirChanges()

Review Comment:
   If just move this line out of loop,print in reverse order.
   The test cases have been added. please review @chia7712 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16662) UnwritableMetadataException: Metadata has been lost

2024-05-21 Thread Jianbin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848424#comment-17848424
 ] 

Jianbin Chen commented on KAFKA-16662:
--

I have encountered the same issue. Can anyone help me with this? I upgraded 
from 3.5.1 to 3.7.0, and I have already changed inter.broker.protocol.version 
to 3.7 and ran it for some time.

But I have never executed

`./bin/kafka-features.sh upgrade --metadata 3.7`

The last time I restarted the cluster, I found that it could not be started 
anymore. The last line of the log is as follows:

```

[2024-05-22 11:01:41,087] INFO [MetadataLoader id=3] 
maybePublishMetadata(LOG_DELTA): The loader is still catching up because we 
have loaded up to offset 41872530, but the high water mark is 41872532 
(org.apache.kafka.image.loader.MetadataLoader)
[2024-05-22 11:01:41,088] INFO [MetadataLoader id=3] 
maybePublishMetadata(LOG_DELTA): The loader finished catching up to the current 
high water mark of 41872532 (org.apache.kafka.image.loader.MetadataLoader)
[2024-05-22 11:01:41,092] INFO [BrokerLifecycleManager id=3] The broker has 
caught up. Transitioning from STARTING to RECOVERY. 
(kafka.server.BrokerLifecycleManager)
[2024-05-22 11:01:41,092] INFO [BrokerServer id=3] Finished waiting for the 
controller to acknowledge that we are caught up (kafka.server.BrokerServer)
[2024-05-22 11:01:41,092] INFO [BrokerServer id=3] Waiting for the initial 
broker metadata update to be published (kafka.server.BrokerServer)
[2024-05-22 11:01:41,095] ERROR Encountered fatal fault: Unhandled error 
initializing new publishers 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been 
lost because the following could not be represented in metadata version 
3.5-IV2: the directory assignment state of one or more replicas
    at 
org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
    at 
org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
    at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
    at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
    at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
    at 
org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
    at 
org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
    at java.base/java.lang.Thread.run(Thread.java:1583)

```

> UnwritableMetadataException: Metadata has been lost
> ---
>
> Key: KAFKA-16662
> URL: https://issues.apache.org/jira/browse/KAFKA-16662
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
> Environment: Docker Image (bitnami/kafka:3.7.0)
> via Docker Compose
>Reporter: Tobias Bohn
>Priority: Major
> Attachments: log.txt
>
>
> Hello,
> First of all: I am new to this Jira and apologize if anything is set or 
> specified incorrectly. Feel free to advise me.
> We currently have an error in our test system, which unfortunately I can't 
> solve, because I couldn't find anything related to it. No solution could be 
> found via the mailing list either.
> The error occurs when we want to start up a node. The node runs using Kraft 
> and is both a controller and a broker. The following error message appears at 
> startup:
> {code:java}
> kafka  | [2024-04-16 06:18:13,707] ERROR Encountered fatal fault: Unhandled 
> error initializing new publishers 
> (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
> kafka  | org.apache.kafka.image.writer.UnwritableMetadataException: Metadata 
> has been lost because the following could not be represented in metadata 
> version 3.5-IV2: the directory assignment state of one or more replicas
> kafka  |        at 
> org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
> kafka  |        at 
> org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
> kafka  |        at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
> kafka  |        at 
> org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
> kafka  |        at 
> org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
> kafka  |        at 
> org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
> kafka  |        at 
> 

Re: [PR] KAFKA-16669: Remove extra collection copy when generating DescribeAclsResource [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15924:
URL: https://github.com/apache/kafka/pull/15924#discussion_r1609187500


##
core/src/main/scala/kafka/server/AclApis.scala:
##
@@ -69,7 +69,7 @@ class AclApis(authHelper: AuthHelper,
   case Some(auth) =>
 val filter = describeAclsRequest.filter
 val returnedAcls = new util.HashSet[AclBinding]()
-auth.acls(filter).forEach(returnedAcls.add)
+returnedAcls.add(auth.acls(filter).iterator().next())

Review Comment:
   OK, I know why you did this change now.
   > we don't need to collect all items to a new collection, right?
   
   I believe what @chia7712 meant is we don't have to do a collection copy here 
because we already allow passing `Iterable` into `aclsResource` method. To make 
it clear, in the JIRA, we said:
   > 1. Iterable -> HashSet
   
   This is the collection copy we want to avoid here. So we don't need 
`returnedAcls` anymore, we can pass the result of `auth.acls(filter)` into 
`aclsResource` directly. Is that clear?



##
core/src/main/scala/kafka/server/AclApis.scala:
##
@@ -69,7 +69,7 @@ class AclApis(authHelper: AuthHelper,
   case Some(auth) =>
 val filter = describeAclsRequest.filter
 val returnedAcls = new util.HashSet[AclBinding]()
-auth.acls(filter).forEach(returnedAcls.add)
+returnedAcls.add(auth.acls(filter).iterator().next())

Review Comment:
   Originally we will add all `auth.acls(filter)` content into `returnedAcls`, 
but after this change, we only add the first element. Why do we need this 
change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
 def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   
   
 // [Luke] We will check if startOffset > 
nextOffsetMetadata.messageOffset (endOffset), 
 // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, 
 // the `OffsetOutOfRangeException` should be thrown before entering 
another `convertToOffsetMetadataOrThrow` loop
   
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 var segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 
// [Luke] So for the first time, the provided `maxOffsetMetadata` might 
be highWaterMark or LastStableOffset, 
// it could enter this `else if` condition because HWM/LSO should is 
always less than LEO. But the 2nd time won't enter here.
   
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop, because this time, 
   `if (startOffset > maxOffsetMetadata.messageOffset)` == `if (startOffset > 
endOffset)`
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
 def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   
   
 // [Luke] We will check if startOffset > 
nextOffsetMetadata.messageOffset (endOffset), 
 // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, 
 // the `OffsetOutOfRangeException` should be thrown before entering 
another `convertToOffsetMetadataOrThrow` loop
   
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 var segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 
// [Luke] So for the first time, the provided `maxOffsetMetadata` might 
be highWaterMark or LastStableOffset, 
// it could enter this `else if` condition because HWM/LSO should is 
always less than LEO. But the 2nd time won't enter here.
   
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop, because 
   `if (startOffset > maxOffsetMetadata.messageOffset)` == `if (startOffset > 
endOffset)`
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
 def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   
   
 // [Luke] We will check if startOffset > 
nextOffsetMetadata.messageOffset (endOffset), 
 // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, 
 // the `OffsetOutOfRangeException` should be thrown before entering 
another `convertToOffsetMetadataOrThrow` loop
   
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 var segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 
// [Luke] So for the first time, the provided `maxOffsetMetadata` might 
be highWaterMark or LastStableOffset, 
// it could enter this `else if` condition because HWM/LSO should is 
always less than LEO. But the 2nd time won't enter here.
   
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop.
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
 def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   
   
 // [Luke] We will check if startOffset > 
nextOffsetMetadata.messageOffset, 
 // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, 
 // the `OffsetOutOfRangeException` should be thrown before entering 
another `convertToOffsetMetadataOrThrow` loop
   
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 var segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 
// [Luke] So for the first time, the provided `maxOffsetMetadata` might 
be highWaterMark or LastStableOffset, 
// it could enter this `else if` condition because HWM/LSO should is 
always less than LEO. But the 2nd time won't enter here.
   
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop.
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
 def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   
   
 // [Luke] We will check if startOffset > 
nextOffsetMetadata.messageOffset, so if the provided `maxOffsetMetadata == 
nextOffsetMetadata`, the `OffsetOutOfRangeException` should be thrown before 
entering another `convertToOffsetMetadataOrThrow` loop
   
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 var segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 
// [Luke] So for the first time, the provided `maxOffsetMetadata` might 
be highWaterMark or LastStableOffset, it could enter this `else if` condition 
because HWM/LSO should is always less than LEO. But the 2nd time won't enter 
here.
   
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop.
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
 def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   
   
 // (1) We will check if startOffset > 
nextOffsetMetadata.messageOffset, so if the provided `maxOffsetMetadata == 
nextOffsetMetadata`, the `OffsetOutOfRangeException` should be thrown before 
entering another `convertToOffsetMetadataOrThrow` loop
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 var segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 
// So for the first time, the provided `maxOffsetMetadata` might be 
highWaterMark or LastStableOffset, it could enter this `else if` condition 
because HWM/LSO should is always less than LEO. But the 2nd time won't enter 
here.
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop.
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16795: Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter [kafka]

2024-05-21 Thread via GitHub


brandboat commented on PR #16020:
URL: https://github.com/apache/kafka/pull/16020#issuecomment-2123611516

   > The benefit of this approach is that we don't need to add removed class 
back. WDYT?
   
   This one is way better, thanks ! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16798) Mirrormaker2 dedicated mode - sync.group.offsets.interval not working

2024-05-21 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848404#comment-17848404
 ] 

Greg Harris commented on KAFKA-16798:
-

Hi [~sektor.coder] thanks for the ticket!

The log message that you're looking at is in WorkerSourceTask: 
[https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L235]
It should be printed every offset.flush.interval.ms: 
[https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L75-L78]
 
That periodic operation is only for writing the task "source offsets", which 
are used to store the replication progress. It is completely separate from the 
consumer group sync feature that it sounds like you're trying to configure.


The analogous log message for that is this one in the MirrorCheckpointTask: 
[https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L387]
 

If you're not seeing that log message, you might look at the OffsetSyncStore: 
[https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L113-L154]
 and in MirrorCheckpointTask: 
[https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L197-L224]
 for reasons why the offset translation/syncing might not be happening as you 
expect.

> Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
> -
>
> Key: KAFKA-16798
> URL: https://issues.apache.org/jira/browse/KAFKA-16798
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0
>Reporter: Thanos Athanasopoulos
>Priority: Major
>
> Single instance MirrorMaker2 in dedicated mode, active passive replication 
> logic.
> sync.group.offsets.interval.seconds=2 configuration is enabled and active
> {noformat}
> [root@x-x ~]# docker logs cc-mm 2>&1 -f | grep -i 
> "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds
>                                  "
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
> {noformat}
> but is not working, the commit of offsets happens *always every 60 seconds* 
> as you can see in the logs
> {noformat}
> [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> 

Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1609062958


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -105,12 +106,19 @@ private KafkaBasedLog 
createBackingStore(MirrorCheckpointConfig
 /**
  * Start the OffsetSyncStore, blocking until all previous Offset Syncs 
have been read from backing storage.
  */
-public void start() {
-backingStore.start();
+public void start(boolean initializationMustReadToEnd) {
+this.initializationMustReadToEnd = initializationMustReadToEnd;
+log.debug("OffsetSyncStore starting - must read to OffsetSync end = ", 
initializationMustReadToEnd);

Review Comment:
   noticed one typo here, the log message actually doesn't print the boolean



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-21 Thread via GitHub


junrao commented on PR #15673:
URL: https://github.com/apache/kafka/pull/15673#issuecomment-2123590782

   @clolov : 3.8.0 feature freeze is about 1 week away. It would be useful to 
get this PR in before the 3.8 branch is cut. Any updates on this PR? What's an 
example of the failed test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]

2024-05-21 Thread via GitHub


junrao commented on code in PR #15993:
URL: https://github.com/apache/kafka/pull/15993#discussion_r1609025456


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2095,7 +2101,8 @@ object UnifiedLog extends Logging {
   }
 
   /**
-   * If the recordVersion is >= RecordVersion.V2, then create and return a 
LeaderEpochFileCache.
+   * If the recordVersion is >= RecordVersion.V2, then create a new 
LeaderEpochFileCache instance
+   * or update current cache if any with the new checkpoint and return it.

Review Comment:
   How about changing it to the following?
   
   "If the recordVersion is >= RecordVersion.V2, create a new 
LeaderEpochFileCache instance. Loading the epoch entries from the backing 
checkpoint file or the provided currentCache if not empty."



##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -42,10 +43,15 @@
  * 
  * Leader Epoch = epoch assigned to each leader by the controller.
  * Offset = offset of the first message in each epoch.
+ * 
+ * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flushes the 
epoch-entry changes to checkpoint asynchronously.

Review Comment:
   flushes => flush



##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -390,7 +424,27 @@ public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint 
leaderEpochCheckpoint)
 lock.readLock().lock();
 try {
 leaderEpochCheckpoint.write(epochEntries());
-return new LeaderEpochFileCache(topicPartition, 
leaderEpochCheckpoint);
+// We instantiate LeaderEpochFileCache after writing 
leaderEpochCheckpoint,
+// hence it is guaranteed that the new cache is consistent with 
the latest epoch entries.
+return new LeaderEpochFileCache(topicPartition, 
leaderEpochCheckpoint, scheduler);
+} finally {
+lock.readLock().unlock();
+}
+}
+
+/**
+ * Returns a new LeaderEpochFileCache which contains same
+ * epoch entries with replacing backing checkpoint

Review Comment:
   checkpoint => checkpoint file



##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -313,14 +345,14 @@ public void truncateFromEnd(long endOffset) {
 if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
 List removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
 
-// We intentionally don't force flushing change to the device 
here because:
+// We flush the change to the device in the background because:

Review Comment:
   It would be useful to explain in the comment that the reason async flush 
works is because the stale epochs always have more entries and no missing 
entries.



##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -55,16 +61,40 @@ public class LeaderEpochFileCache {
 /**
  * @param topicPartition the associated topic partition
  * @param checkpoint the checkpoint file
+ * @param scheduler  the scheduler to use for async I/O operations
  */
 @SuppressWarnings("this-escape")
-public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint, Scheduler scheduler) {
 this.checkpoint = checkpoint;
 this.topicPartition = topicPartition;
+this.scheduler = scheduler;
 LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
 log = logContext.logger(LeaderEpochFileCache.class);
 checkpoint.read().forEach(this::assign);
 }
 
+/**
+ * Instantiates a new LeaderEpochFileCache with replacing checkpoint with 
given one
+ * without restoring the cache from the checkpoint, with retaining the 
current epoch entries.

Review Comment:
   How about following?
   
   "Instantiate a new LeaderEpochFileCache with provided epoch entries instead 
of from the backing checkpoint file. The provided epoch entries are expected to 
no less fresher than the checkpoint file."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and controllers [kafka]

2024-05-21 Thread via GitHub


cmccabe commented on PR #16006:
URL: https://github.com/apache/kafka/pull/16006#issuecomment-2123537218

   Tests should be fixed now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2123528226

   @gharris1727 I gave up and used the ugly try. 
   That warning is not occurring in every test... But I went all the way in 
`OffsetSyncStoreTest` as I prefer consistency to beauty. 
   Removed a couple of warnings in MirrorCheckpointTaskTest too in the second 
commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16626: Lazily convert subscribed topic names to topic ids [kafka]

2024-05-21 Thread via GitHub


jeffkbkim commented on PR #15970:
URL: https://github.com/apache/kafka/pull/15970#issuecomment-2123510093

   @dajac thanks for the review. I have addressed your comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608986312


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
-emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-  else {
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) {
+// We need to be careful before reading the segment as 
`maxOffsetMetadata` may not be a complete metadata:
+// 1. If maxOffsetMetadata is message-offset-only, then return empty 
fetchDataInfo since
+// maxOffsetMetadata.offset is not on local log segments.
+// 2. If maxOffsetMetadata.segmentBaseOffset is smaller than 
segment.baseOffset, then return empty fetchDataInfo.

Review Comment:
   Looking at the code again. @showuon is correct. 
`convertToOffsetMetadataOrThrow(startOffset)` uses nextOffset, which always has 
the offset metadata. So, we can keep the 
`convertToOffsetMetadataOrThrow(startOffset)` call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608984545


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) {

Review Comment:
   Returning the current offset is correct. I am wondering why the suggested 
approach returns nextOffset. `segment.read(startOffset, maxLength, maxPosition, 
minOneMessage)` should return a non-null fetchDataInfo since the startOffset 
exists in the first segment, right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16779) Kafka retains logs past specified retention

2024-05-21 Thread Nicholas Feinberg (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847997#comment-17847997
 ] 

Nicholas Feinberg edited comment on KAFKA-16779 at 5/21/24 9:41 PM:


When we explicitly set topics' retention to 4d (34560ms), our brokers 
immediately expired the surprisingly old logs. When we removed the setting, 
they began accumulating old logs again.

I've confirmed that the same setting is present in the brokers' 
`server.properties` file - that is, they have `log.retention.hours=96`. I've 
also checked and confirmed that topics do not have an explicitly set retention 
that would override this.


was (Author: nfeinberg):
When we explicitly set topics' retention to 4d (34560ms), our brokers 
immediately expired the surprisingly old logs.

I've confirmed that the same setting is present in the brokers' 
`server.properties` file - that is, they have `log.retention.hours=96`. I've 
also checked and confirmed that topics do not have an explicitly set retention 
that would override this.

> Kafka retains logs past specified retention
> ---
>
> Key: KAFKA-16779
> URL: https://issues.apache.org/jira/browse/KAFKA-16779
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Nicholas Feinberg
>Priority: Major
>  Labels: expiration, retention
> Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, 
> kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, 
> state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz
>
>
> In a Kafka cluster with all topics set to four days of retention or longer 
> (34560ms), most brokers seem to be retaining six days of data.
> This is true even for topics which have high throughput (500MB/s, 50k msgs/s) 
> and thus are regularly rolling new log segments. We observe this unexpectedly 
> high retention both via disk usage statistics and by requesting the oldest 
> available messages from Kafka.
> Some of these brokers crashed with an 'mmap failed' error (attached). When 
> those brokers started up again, they returned to the expected four days of 
> retention.
> Manually restarting brokers also seems to cause them to return to four days 
> of retention. Demoting and promoting brokers only has this effect on a small 
> part of the data hosted on a broker.
> These hosts had ~170GiB of free memory available. We saw no signs of pressure 
> on either system or JVM heap memory before or after they reported this error. 
> Committed memory seems to be around 10%, so this doesn't seem to be an 
> overcommit issue.
> This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). 
> Prior to the upgrade, it was running on Kafka 2.4.
> We last reduced retention for ops on May 7th, after which we restored 
> retention to our default of four days. This was the second time we've 
> temporarily reduced and restored retention since the upgrade. This problem 
> did not manifest the previous time we did so, nor did it manifest on our 
> other Kafka 3.7 clusters.
> We are running on AWS 
> [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We 
> have 23 brokers, each with 24 disks. We're running in a JBOD configuration 
> (i.e. unraided).
> Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, 
> we're still using Zookeeper.
> Sample broker logs are attached. The 05-12 and 05-14 logs are from separate 
> hosts. Please let me know if I can provide any further information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608965614


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -82,77 +86,131 @@ public void testOffsetTranslation() {
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
-try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-// no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {

Review Comment:
   > if you approve I'd also backport the fix to 3.7
   
   I'm on the fence about that, leaning towards yes. I regret backporting 
KAFKA-12468 so far and introducing this issue, and I didn't communicate it 
properly to users. I think you can backport this once you have a full release 
note written that can be backported at the same time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608959573


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -82,77 +86,131 @@ public void testOffsetTranslation() {
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
-try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-// no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {

Review Comment:
   I have somewhat strong feelings, I wouldn't call them very strong. If 
someone noticed this IDE warning and created a ticket and a PR to fix it, I 
would review that. Am I going to make the build enforce this warning? No, but I 
have seen other situations where the warning did point out real resource 
leaks...
   
   I just wanted to save the effort required to go and rework this later, and 
prevent this PR from introducing an easily avoidable warning. I agree with you 
about suppressing warnings, I don't think that is a healthy practice to have.
   
   I just tried making this a try-with-resources and the indenting turned out 
fine. The body of backingStoreStart is at the exact same indentation as it is 
currently.
   ```
   try (FakeOffsetSyncStore store = new FakeOffsetSyncStore() {
   @Override
   void backingStoreStart() {
   // read a sync during startup
   sync(tp, 100, 200);
   assertEquals(OptionalLong.empty(), translateDownstream(null, 
tp, 0));
   assertEquals(OptionalLong.empty(), translateDownstream(null, 
tp, 100));
   assertEquals(OptionalLong.empty(), translateDownstream(null, 
tp, 200));
   }
   }) {
   // no offsets exist and store is not started
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 0));
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 100));
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 200));
   
   // After the store is started all offsets are visible
   store.start(true);
   
   assertEquals(OptionalLong.of(-1), 
store.translateDownstream(null, tp, 0));
   assertEquals(OptionalLong.of(200), 
store.translateDownstream(null, tp, 100));
   assertEquals(OptionalLong.of(201), 
store.translateDownstream(null, tp, 200));
   }
   ```
   Here's the Consumer alternative I thought about, which uses one less 
indentation level at the cost of a variable, a field, and two constructors:
   ```
  Consumer init = store -> {
   // read a sync during startup
   store.sync(tp, 100, 200);
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 0));
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 100));
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 200));
   };
   try (FakeOffsetSyncStore store = new FakeOffsetSyncStore(init)) {
   // no offsets exist and store is not started
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 0));
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 100));
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 200));
   
   // After the store is started all offsets are visible
   store.start(true);
   
   assertEquals(OptionalLong.of(-1), 
store.translateDownstream(null, tp, 0));
   assertEquals(OptionalLong.of(200), 
store.translateDownstream(null, tp, 100));
   assertEquals(OptionalLong.of(201), 
store.translateDownstream(null, tp, 200));
   }
   ```
   Either of these is preferable to having the warning or suppressing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]

2024-05-21 Thread via GitHub


ableegoldman commented on code in PR #16024:
URL: https://github.com/apache/kafka/pull/16024#discussion_r1608950503


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TopicPartitionAssignmentInfo.java:
##
@@ -0,0 +1,45 @@
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This is a simple container class used during the assignment process to 
distinguish
+ * TopicPartitions type. Since the assignment logic can depend on the type of 
topic we're
+ * looking at, and the rack information of the partition, this container class 
should have
+ * everything necessary to make informed task assignment decisions.
+ */
+public interface TopicPartitionAssignmentInfo {

Review Comment:
   I mean I do agree on keeping it scoped to the assignment, and putting it 
under the assignment package should be effective enough. I was more thinking 
along the lines of marking a clear distinction between interfaces/metadata that 
falls under the `ApplicationState`/input status vs things that fall under the 
`TaskAssignment`/output category. Unfortunately "assignment" is just an 
overloaded term, but since it's correlated with assignor output in other class 
names, I think it's best not to use it in class names to also mean "related to 
the assignor" since putting it under the assignment package is another way to 
signal that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16809: Run Javadoc in CI [kafka]

2024-05-21 Thread via GitHub


gharris1727 opened a new pull request, #16025:
URL: https://github.com/apache/kafka/pull/16025

   Until now, javadoc warnings can be added in PRs, and are only noticed when 
someone else runs `./gradlew javadoc`. Now introducing problems in javadocs 
will immediately fail the PR CI build, notifying the contributor/committer when 
a PR has a javadoc regression.
   
   This only happens in JDK15+ which adds the `-Werror` parameter.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16739: Exclude protected members from aggregated release Javadocs [kafka]

2024-05-21 Thread via GitHub


gharris1727 merged PR #15940:
URL: https://github.com/apache/kafka/pull/15940


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16739: Exclude protected members from aggregated release Javadocs [kafka]

2024-05-21 Thread via GitHub


gharris1727 commented on PR #15940:
URL: https://github.com/apache/kafka/pull/15940#issuecomment-2123422940

   Build passes, CI test failures appear unrelated, and this passes for me 
locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-21 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608934073


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
 }
 
 @Test
-void testEnsureEventsAreCompleted() {

Review Comment:
   Yes, the test was a little suspect in terms of its value-add, so I'd removed 
it.
   
   I was planning to file a Jira to move several of the tests (including this 
one) from `ConsumerNetworkThreadTest` to `ApplicationEventProcessorTest`. Then 
we could fix up some of the funkiness in this test as a separate task.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16558) Implement HeartbeatRequestState.toStringBase()

2024-05-21 Thread Brenden DeLuna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brenden DeLuna reassigned KAFKA-16558:
--

Assignee: Brenden DeLuna  (was: Kirk True)

> Implement HeartbeatRequestState.toStringBase()
> --
>
> Key: KAFKA-16558
> URL: https://issues.apache.org/jira/browse/KAFKA-16558
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Brenden DeLuna
>Priority: Minor
>  Labels: consumer-threading-refactor, logging
> Fix For: 3.8.0
>
>
> The inner class {{HeartbeatRequestState}} does not override the 
> {{toStringBase()}} method. This affects debugging and troubleshooting 
> consumer issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16557) Fix OffsetFetchRequestState.toString()

2024-05-21 Thread Brenden DeLuna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brenden DeLuna reassigned KAFKA-16557:
--

Assignee: Brenden DeLuna  (was: Kirk True)

> Fix OffsetFetchRequestState.toString()
> --
>
> Key: KAFKA-16557
> URL: https://issues.apache.org/jira/browse/KAFKA-16557
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Brenden DeLuna
>Priority: Minor
>  Labels: consumer-threading-refactor, logging
> Fix For: 3.8.0
>
>
> The code incorrectly overrides the {{toString()}} method instead of 
> overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting 
> consumer issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16000) Migrate MembershipManagerImplTest away from ConsumerTestBuilder

2024-05-21 Thread Brenden DeLuna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brenden DeLuna reassigned KAFKA-16000:
--

Assignee: Brenden DeLuna  (was: Kirk True)

> Migrate MembershipManagerImplTest away from ConsumerTestBuilder
> ---
>
> Key: KAFKA-16000
> URL: https://issues.apache.org/jira/browse/KAFKA-16000
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Brenden DeLuna
>Priority: Minor
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15999) Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder

2024-05-21 Thread Brenden DeLuna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brenden DeLuna reassigned KAFKA-15999:
--

Assignee: Brenden DeLuna  (was: Kirk True)

> Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder
> -
>
> Key: KAFKA-15999
> URL: https://issues.apache.org/jira/browse/KAFKA-15999
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Brenden DeLuna
>Priority: Minor
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder

2024-05-21 Thread Brenden DeLuna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brenden DeLuna reassigned KAFKA-16001:
--

Assignee: Brenden DeLuna

> Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
> ---
>
> Key: KAFKA-16001
> URL: https://issues.apache.org/jira/browse/KAFKA-16001
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Brenden DeLuna
>Priority: Minor
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder

2024-05-21 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16001:
---
Description: 
We should:
 # Remove spy calls to the dependencies
 # Remove ConsumerNetworkThreadTest

> Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
> ---
>
> Key: KAFKA-16001
> URL: https://issues.apache.org/jira/browse/KAFKA-16001
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Brenden DeLuna
>Priority: Minor
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.8.0
>
>
> We should:
>  # Remove spy calls to the dependencies
>  # Remove ConsumerNetworkThreadTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]

2024-05-21 Thread via GitHub


apourchet commented on code in PR #16024:
URL: https://github.com/apache/kafka/pull/16024#discussion_r1608924234


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TopicPartitionAssignmentInfo.java:
##
@@ -0,0 +1,45 @@
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This is a simple container class used during the assignment process to 
distinguish
+ * TopicPartitions type. Since the assignment logic can depend on the type of 
topic we're
+ * looking at, and the rack information of the partition, this container class 
should have
+ * everything necessary to make informed task assignment decisions.
+ */
+public interface TopicPartitionAssignmentInfo {

Review Comment:
   To elaborate on what I said I believe that generally speaking interfaces 
that are too broad when created get super bloated over time, so I usually err 
on the side of small and precise and expand if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and controllers [kafka]

2024-05-21 Thread via GitHub


cmccabe commented on code in PR #16006:
URL: https://github.com/apache/kafka/pull/16006#discussion_r1608921813


##
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##
@@ -350,11 +347,7 @@ class ZkMetadataCache(
 
   override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
Option[Node] = {
 val snapshot = metadataSnapshot
-brokerId match {
-  case id if 
snapshot.controllerId.filter(_.isInstanceOf[KRaftCachedControllerId]).exists(_.id
 == id) =>
-kraftControllerNodeMap.get(id)
-  case _ => 
snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))
-}
+snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))

Review Comment:
   I recall why we did this now (I didn't originally)...
   
   During ZK migration, we wanted a way to make the controller node provider 
work with both ZK and KRaft controllers.
   
   The correct way to do this is just to consult `RaftManager` for the endpoint 
when we're dealing with a KRaft controller (which we do know)
   
   But at the time, we "hacked" it by adding the controllers to the cache as 
"brokers" for some functions but not others (ew)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]

2024-05-21 Thread via GitHub


apourchet commented on code in PR #16024:
URL: https://github.com/apache/kafka/pull/16024#discussion_r1608918902


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TopicPartitionAssignmentInfo.java:
##
@@ -0,0 +1,45 @@
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This is a simple container class used during the assignment process to 
distinguish
+ * TopicPartitions type. Since the assignment logic can depend on the type of 
topic we're
+ * looking at, and the rack information of the partition, this container class 
should have
+ * everything necessary to make informed task assignment decisions.
+ */
+public interface TopicPartitionAssignmentInfo {

Review Comment:
   It makes sense, I thought that since it's metadata that will uniquely be 
used for task assignment I was OK having `Assignment` in the name, but I'll 
change to `TaskTopicPartition`  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-21 Thread via GitHub


cmccabe commented on PR #16008:
URL: https://github.com/apache/kafka/pull/16008#issuecomment-2123394515

   > Are there any tests that we should add or do the existing test cover this 
functionality?
   
   I think there are some that cover it already, but I added a test of 
controller failover in `KRaftClusterTest`, just to be sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]

2024-05-21 Thread via GitHub


ableegoldman commented on code in PR #16024:
URL: https://github.com/apache/kafka/pull/16024#discussion_r1608917448


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TopicPartitionAssignmentInfo.java:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This is a simple container class used during the assignment process to 
distinguish
+ * TopicPartitions type. Since the assignment logic can depend on the type of 
topic we're
+ * looking at, and the rack information of the partition, this container class 
should have
+ * everything necessary to make informed task assignment decisions.
+ */
+public interface TopicPartitionAssignmentInfo {
+/**
+ *
+ * @return the string name of the topic.
+ */
+String topic();
+
+/**
+ *
+ * @return the partition id of this topic partition.
+ */
+int partition();

Review Comment:
   nit: I'd return the TopicPartition directly rather than having APIs for the 
partition and topic separately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]

2024-05-21 Thread via GitHub


ableegoldman commented on code in PR #16024:
URL: https://github.com/apache/kafka/pull/16024#discussion_r1608911390


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TopicPartitionAssignmentInfo.java:
##
@@ -0,0 +1,45 @@
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This is a simple container class used during the assignment process to 
distinguish
+ * TopicPartitions type. Since the assignment logic can depend on the type of 
topic we're
+ * looking at, and the rack information of the partition, this container class 
should have
+ * everything necessary to make informed task assignment decisions.
+ */
+public interface TopicPartitionAssignmentInfo {

Review Comment:
   the only thing I take issue with is the name here -- technically this is 
just application metadata about how this topic partition fits into the 
topology, ie it's not really "assignment" info -- we're just using it for 
assignment purposes
   
   ...I'm not sure I'm explaining that well and it sounds overly pedantic, but 
if I didn't bring it up now then someone probably would say the same thing on 
the KIP discussion once we send out this update.
   
   I know `TopicPartitionInfo` is already taken, and is too broad anyways. I'd 
recommend something like `TaskTopicPartition` which I think helps drive home 
the point that this is metadata related to how this topic partition relates to 
the given task, not info about the topic partition itself per say
   
   Does that make any sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-21 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
 }
 
 @Test
-void testEnsureEventsAreCompleted() {

Review Comment:
   Actually seems to me that we shouldn't have this test here (and maybe this 
is why @kirktrue removed it before?). As I see it, this unit test is testing 
something that is not the `ConsumerNetworkThread`'s responsibility (and that's 
why it ends up being complicated, having to mimic the reaper behaviour and 
spying). It is testing that events are completed, and that's the reaper.reap 
responsibility, so seems to me we need to:
   
   1. test that the `ConsumerNetworkThread` calls the reaper with the full list 
of events -> done already in the 
[testCleanupInvokesReaper](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java#L331)
   2. test that the `CompletableEventReaper.reap(Collection events)` 
completes the events -> done in CompletableEventReaperTest 
([testIncompleteQueue](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L135)
 and 
[testIncompleteTracked](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L170))
 
   
   In the end, as it is, we end up asserting a behaviour we're mocking 
ourselves in the `doAnswer`, so not much value I would say? Agree with @cadonna 
that we need coverage, but I would say that we have it, on my points 1 and 2, 
and this should be removed. Makes sense? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-21 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
 }
 
 @Test
-void testEnsureEventsAreCompleted() {

Review Comment:
   Actually seems to me that we shouldn't have this test here (and maybe this 
is why @kirktrue removed it before?). As I see it, this unit test is testing 
something that is not the `ConsumerNetworkThread`'s responsibility (and that's 
why it ends up being complicated, having to mimic the reaper behaviour and 
spying). It is testing that events are completed, and that's the reaper.reap 
responsibility, so seems to me we need to:
   
   1. test that the `ConsumerNetworkThread` calls the reaper with the full list 
of events -> done already in the 
[testCleanupInvokesReaper](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java#L331)
   2. test that the `CompletableEventReaper.reap(Collection events)` 
completes the events -> done in CompletableEventReaperTest 
([testIncompleteQueue](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L135)
 and 
[testIncompleteTracked](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L170))
 
   
   In the end, as it is, we end up asserting a behaviour we're mocking 
ourselves in the `doAnswer`, so not much value I would say? Agree with @cadonna 
that we need coverage, but I answer that we have it, on my points 1 and 2, and 
this should be removed. Makes sense? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-21 Thread via GitHub


dongnuo123 commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1608908254


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4209,31 +4241,67 @@ private void removePendingSyncMember(
  * @param contextThe request context.
  * @param requestThe actual Heartbeat request.
  *
- * @return The Heartbeat response.
+ * @return The coordinator result that contains the heartbeat response.
  */
-public HeartbeatResponseData classicGroupHeartbeat(
+public CoordinatorResult 
classicGroupHeartbeat(

Review Comment:
   I guess because now we'll also read from the ConsumerGroup related things 
which are timeline data structures. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608907673


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -82,77 +86,131 @@ public void testOffsetTranslation() {
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
-try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-// no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {

Review Comment:
   on another note, if you approve I'd also backport the fix to 3.7 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-21 Thread via GitHub


cmccabe commented on code in PR #16008:
URL: https://github.com/apache/kafka/pull/16008#discussion_r1608907374


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -313,4 +314,8 @@ class KafkaRaftManager[T](
   override def leaderAndEpoch: LeaderAndEpoch = {
 client.leaderAndEpoch
   }
+
+  override def idToNode(id: Int, listener: String): Option[Node] = {
+client.idToNode(id, listener).toScala
+  }

Review Comment:
   ack



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() {
 }
 }
 
+public Optional idToNode(int id, String listener) {

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-21 Thread via GitHub


dongnuo123 commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1608907132


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+ * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+ * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+ * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual Heartbeat request.
+ *
+ * @return The coordinator result that contains the heartbeat response.
+ */
+private CoordinatorResult 
classicGroupHeartbeatToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+HeartbeatRequestData request
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+Errors error = Errors.NONE;
+if (member.memberEpoch() < group.groupEpoch() ||
+member.state() == MemberState.UNREVOKED_PARTITIONS ||
+(member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.hasUnreleasedPartitions(member))) {
+error = Errors.REBALANCE_IN_PROGRESS;
+scheduleConsumerGroupJoinTimeout(groupId, memberId, 
member.rebalanceTimeoutMs());

Review Comment:
   > we cancel the join timeout when we first convert to consumer group
   
   We don't cancel the timeout in case the conversion fails and the state needs 
to be reverted. The classic group join timeout does nothing if the group is a 
consumer group.
   
   > when we have a group epoch bump we tell the classic group member we're 
rebalancing and they should send a join request
   
   Yes correct, and the timeout here is for the member instead of the whole 
group. For each member, the rebalance will be something like
   - heartbeat -- if there's an ongoing rebalance, schedule the join timeout
   - join -- cancel the join timeout; schedule the sync timeout
   - sync -- cancel the sync timeout; maybe schedule a join timeout if a new 
rebalance ongoing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608896699


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -82,77 +86,131 @@ public void testOffsetTranslation() {
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
-try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-// no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {

Review Comment:
   Hi @gharris1727 
   I use IntelliJ too and saw the warning. 
   I could have used a `@suppress` annotation but I am very reluctant to make 
code less readable because of limited insight by linters. Similarly to make the 
fake store more complex.
   
   Using try-with-resource with a local class results in horrible indentation 
as you said.
   I don't share a strong worry of future leaks in testing - seems speculative 
to me.
   In this instance unless you have very strong feelings, I'd really leave the 
test as-is 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-21 Thread via GitHub


cmccabe commented on code in PR #16008:
URL: https://github.com/apache/kafka/pull/16008#discussion_r1608906143


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -366,4 +368,16 @@ public static VoterSet fromInetSocketAddresses(String 
listener, Map idToNode(int id, String listener) {
+VoterNode voterNode = voters.get(id);
+if (voterNode == null) {
+return Optional.empty();
+}
+InetSocketAddress address = voterNode.listeners.get(listener);
+if (address == null) {
+return Optional.empty();
+}
+return Optional.of(new Node(id, address.getHostString(), 
address.getPort()));
+}

Review Comment:
   ok



##
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##
@@ -548,6 +550,10 @@ public VotedState votedStateOrThrow() {
 .orElseThrow(() -> new IllegalStateException("Expected to be 
Voted, but current state is " + state));
 }
 
+public Optional idToNode(int id, String listener) {
+return latestVoterSet.get().idToNode(id, listener);
+}

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-21 Thread via GitHub


cmccabe commented on code in PR #16008:
URL: https://github.com/apache/kafka/pull/16008#discussion_r1608904666


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() {
 }
 }
 
+public Optional idToNode(int id, String listener) {
+return quorum.idToNode(id, listener);

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and controllers [kafka]

2024-05-21 Thread via GitHub


jsancio commented on code in PR #16006:
URL: https://github.com/apache/kafka/pull/16006#discussion_r1608902290


##
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##
@@ -350,11 +347,7 @@ class ZkMetadataCache(
 
   override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
Option[Node] = {
 val snapshot = metadataSnapshot
-brokerId match {
-  case id if 
snapshot.controllerId.filter(_.isInstanceOf[KRaftCachedControllerId]).exists(_.id
 == id) =>
-kraftControllerNodeMap.get(id)
-  case _ => 
snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))
-}
+snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))

Review Comment:
   Interesting. Do you know why this was added in the first place? @akhileshchg 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 6) Post process new assignment structure [kafka]

2024-05-21 Thread via GitHub


ableegoldman merged PR #16002:
URL: https://github.com/apache/kafka/pull/16002


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 6) Post process new assignment structure [kafka]

2024-05-21 Thread via GitHub


ableegoldman commented on PR #16002:
URL: https://github.com/apache/kafka/pull/16002#issuecomment-2123367972

   Test failures are unrelated, merging to trunk
   
   Worth noting that the 
`org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest`
 did fail, which may seem related, but cannot be since this PR only adds new 
methods. Furthermore, I have seen this test fail occasionally in recent weeks, 
and while this is probably a real bug/failure (see 
[comment](https://issues.apache.org/jira/browse/KAFKA-16586?focusedCommentId=17848363=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17848363)),
 it seems to have been flaky since before we started work on KIP-924 so it must 
be unrelated. Just mentioning this in case we see this fail on future KIP-924 
PRs that do more than just add new methods


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-21 Thread via GitHub


jsancio commented on code in PR #16008:
URL: https://github.com/apache/kafka/pull/16008#discussion_r1608891761


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() {
 }
 }
 
+public Optional idToNode(int id, String listener) {
+return quorum.idToNode(id, listener);

Review Comment:
   Let's use `partitionState` directly and remove the changes to `QuorumState`.
   ```java
   return partitionState.latestVoterSet().idToNode(id, listener);
   ```



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() {
 }
 }
 
+public Optional idToNode(int id, String listener) {

Review Comment:
   Similar to my other comments but how about `voterNode` for the name of the 
method?



##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -313,4 +314,8 @@ class KafkaRaftManager[T](
   override def leaderAndEpoch: LeaderAndEpoch = {
 client.leaderAndEpoch
   }
+
+  override def idToNode(id: Int, listener: String): Option[Node] = {
+client.idToNode(id, listener).toScala
+  }

Review Comment:
   I prefer if we call this something like `voterNode` as it makes it clear 
that this only resolves to a `Option[Node]` if the id is a voter/controller.



##
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##
@@ -548,6 +550,10 @@ public VotedState votedStateOrThrow() {
 .orElseThrow(() -> new IllegalStateException("Expected to be 
Voted, but current state is " + state));
 }
 
+public Optional idToNode(int id, String listener) {
+return latestVoterSet.get().idToNode(id, listener);
+}

Review Comment:
   See my other comment but I think we can remove this.



##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -366,4 +368,16 @@ public static VoterSet fromInetSocketAddresses(String 
listener, Map idToNode(int id, String listener) {
+VoterNode voterNode = voters.get(id);
+if (voterNode == null) {
+return Optional.empty();
+}
+InetSocketAddress address = voterNode.listeners.get(listener);
+if (address == null) {
+return Optional.empty();
+}
+return Optional.of(new Node(id, address.getHostString(), 
address.getPort()));
+}

Review Comment:
   Okay. I added a similar method in this PR: 
https://github.com/apache/kafka/pull/15986/files#diff-7164c449a4cc53dd28cc1a7201fa8b7a824749dab013fb33dff101a1002565daR86-R101
   
   If you agree, do you mind changing the name to `voterNode` to minimize the 
conflicts. Don't worry about using `ListenerName` instead of `String` as my PR 
deals with that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-21 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
 }
 
 @Test
-void testEnsureEventsAreCompleted() {

Review Comment:
   Actually seems to me that we shouldn't have this test here (and maybe this 
is why @kirktrue removed it before?). As I see it, this unit test is testing 
something that is not the `ConsumerNetworkThread`'s responsibility (and that's 
why it ends up being complicated, having to mimic the reaper behaviour and 
spying). It is testing that events are completed, and that's the reaper.reap 
responsibility, so seems to me we need to:
   
   1. test that the `ConsumerNetworkThread` calls the reaper with the full list 
of events -> done already in the 
[testCleanupInvokesReaper](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java#L331)
   2. test that the `CompletableEventReaper.reap(Collection events)` 
completes the events, and that's done in CompletableEventReaperTest 
([testIncompleteQueue](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L135)
 and 
[testIncompleteTracked](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L170))
 
   
   In the end, as it is, we end up asserting a behaviour we're mocking 
ourselves in the `doAnswer`, so not much value I would say? Agree with @cadonna 
that we need coverage, but I answer that we have it, on my points 1 and 2, and 
this should be removed. Makes sense? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-21 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
 }
 
 @Test
-void testEnsureEventsAreCompleted() {

Review Comment:
   Actually seems to me that we shouldn't have this test here (and maybe this 
is why @kirktrue removed it before?). As I see it, this unit test is testing 
something that is not the `ConsumerNetworkThread`'s responsibility (and that's 
why it ends up being complicated, having to mimic the reaper behaviour and 
spying). It is testing that events are completed, and that's the reaper.reap 
responsibility, so seems to me we need to:
   
   1. test that the `ConsumerNetworkThread` calls the reaper with the full list 
of events -> done already in the 
[testCleanupInvokesReaper](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java#L331)
   2. test that the `CompletableEventReaper.reap(Collection events)` 
completes the events, and that's done in CompletableEventReaperTest 
([testIncompleteQueue](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L135)
 and 
[testIncompleteTracked](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L170))
 
   
   In the end, as it is, we end up asserting a behaviour we're mocking 
ourselves in the `doAnswer`, so not much value I would say? Agree with @cadonna 
that we need coverage, but I would say that's what we have on my points 1 and 
2, and this should be removed. Makes sense? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-21 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
 }
 
 @Test
-void testEnsureEventsAreCompleted() {

Review Comment:
   Actually seems to me that we shouldn't have this test here (and maybe this 
is why @kirktrue removed it before?). As I see it, this unit test is testing 
something that is not the `ConsumerNetworkThread`'s responsibility (and that's 
why it ends up being complicate, having to mimic the reaper behaviour and 
spying). It is testing that events are completed, and that's the reaper.reap 
responsibility, so seems to me we need to:
   
   1. test that the `ConsumerNetworkThread` calls the reaper with the full list 
of events -> done already in the 
[testCleanupInvokesReaper](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java#L331)
   2. test that the `CompletableEventReaper.reap(Collection events)` 
completes the events, and that's done in CompletableEventReaperTest 
([testIncompleteQueue](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L135)
 and 
[testIncompleteTracked](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L170))
 
   
   In the end, as it is, we end up asserting a behaviour we're mocking 
ourselves in the `doAnswer`, so not much value I would say? Agree with @cadonna 
that we need coverage, but I would say that's what we have on my points 1 and 
2, and this should be removed. Makes sense? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608896699


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -82,77 +86,131 @@ public void testOffsetTranslation() {
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
-try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-// no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {

Review Comment:
   Hi @gharris1727 I use IntelliJ too and see the warning. I am reluctant to 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16809) Run javadoc build in CI

2024-05-21 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16809:
---

 Summary: Run javadoc build in CI
 Key: KAFKA-16809
 URL: https://issues.apache.org/jira/browse/KAFKA-16809
 Project: Kafka
  Issue Type: Task
  Components: build, docs
Reporter: Greg Harris
Assignee: Greg Harris


The `javadoc` target isn't run during CI builds, allowing for errors in 
javadocs to leak in.

Instead, we can include javadoc like checkstyle, spotbugs, and import control 
as a pre-test step, to ensure that PRs aren't causing javadoc build regressions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-21 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608875582


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -329,6 +334,17 @@ public void testCommitAsyncWithFencedException() {
 
commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
 
 assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> 
consumer.commitAsync());
+
+// Close the consumer here as we know it will cause a 
FencedInstanceIdException to be thrown.
+// If we get an error other than the FencedInstanceIdException, we'll 
raise a ruckus.
+try {
+consumer.close();
+} catch (KafkaException e) {
+assertNotNull(e.getCause());
+assertInstanceOf(FencedInstanceIdException.class, e.getCause());
+} finally {
+consumer = null;
+}

Review Comment:
   Yes, it turns out that changes made elsewhere have obviated the need for 
this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-21 Thread via GitHub


lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608870767


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -329,6 +334,17 @@ public void testCommitAsyncWithFencedException() {
 
commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
 
 assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> 
consumer.commitAsync());
+
+// Close the consumer here as we know it will cause a 
FencedInstanceIdException to be thrown.
+// If we get an error other than the FencedInstanceIdException, we'll 
raise a ruckus.
+try {
+consumer.close();
+} catch (KafkaException e) {
+assertNotNull(e.getCause());
+assertInstanceOf(FencedInstanceIdException.class, e.getCause());
+} finally {
+consumer = null;
+}

Review Comment:
   how did we resolve this? I see the section got completely removed, 
verification not needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16713: Define initial set of RPCs for KIP-932 [kafka]

2024-05-21 Thread via GitHub


AndrewJSchofield commented on PR #16022:
URL: https://github.com/apache/kafka/pull/16022#issuecomment-2123317868

   Yes, we will wait until we cut the 3.8 branch before merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-05-21 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848363#comment-17848363
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16586:


By the way – if this test is failing, it's highly likely that this is a real 
bug. This test was designed to be completely deterministic (hence the seed) and 
doesn't set up any actual clients or clusters or anything that might make it 
flaky. [~mjsax] if you see this again can you report the failure here 
(including both the seed and rackAwareStrategy parameter)?

> Test TaskAssignorConvergenceTest failing
> 
>
> Key: KAFKA-16586
> URL: https://issues.apache.org/jira/browse/KAFKA-16586
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>
> {code:java}
> java.lang.AssertionError: Assertion failed in randomized test. Reproduce 
> with: `runRandomizedScenario(-538095696758490522)`.  at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
>   at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
> This might expose an actual bug (or incorrect test setup) and should be 
> reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-05-21 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848358#comment-17848358
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16586:


Note: when reporting an instance of failure for this test, please include both 
the seed (contained in the stack trace, eg 
"runRandomizedScenario(-3595932977400264775)") and the full test name including 
parameters (not present in the original ticket description, eg 
"rackAwareStrategy=balance_subtopology")

We need the test name to figure out which variant of the assignor was running. 
This will help narrow down the issue to a specific rackAwareStrategy, as well 
as enable reproduction of the failed test

> Test TaskAssignorConvergenceTest failing
> 
>
> Key: KAFKA-16586
> URL: https://issues.apache.org/jira/browse/KAFKA-16586
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>
> {code:java}
> java.lang.AssertionError: Assertion failed in randomized test. Reproduce 
> with: `runRandomizedScenario(-538095696758490522)`.  at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
>   at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
> This might expose an actual bug (or incorrect test setup) and should be 
> reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-21 Thread via GitHub


cmccabe commented on PR #16008:
URL: https://github.com/apache/kafka/pull/16008#issuecomment-2123296527

   I have re-run all the test failures locally, and they all passed (they were 
flakes)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16343: Add unit tests of foreignKeyJoin classes [kafka]

2024-05-21 Thread via GitHub


wcarlson5 merged PR #15564:
URL: https://github.com/apache/kafka/pull/15564


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]

2024-05-21 Thread via GitHub


rreddy-22 commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1608849959


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) {
 return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
 }
 
+/**
+ * @return An immutable map containing all the topic partitions
+ * with their current member assignments.
+ */
+public Map> partitionAssignments() {
+return Collections.unmodifiableMap(partitionAssignments);
+}
+
 /**
  * Updates target assignment of a member.
  *
  * @param memberId  The member id.
  * @param newTargetAssignment   The new target assignment.
  */
 public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+updatePartitionAssignments(
+memberId,
+targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+newTargetAssignment
+);
 targetAssignment.put(memberId, newTargetAssignment);
 }
 
+/**
+ * Updates partition assignments of the topics.
+ *
+ * @param memberId  The member Id.
+ * @param oldTargetAssignment   The old target assignment.
+ * @param newTargetAssignment   The new target assignment.
+ *
+ * Package private for testing.
+ */
+void updatePartitionAssignments(

Review Comment:
   used it in tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16343: Add unit tests of foreignKeyJoin classes [kafka]

2024-05-21 Thread via GitHub


wcarlson5 commented on PR #15564:
URL: https://github.com/apache/kafka/pull/15564#issuecomment-2123293255

   The failing tests are not related. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]

2024-05-21 Thread via GitHub


rreddy-22 commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1608849959


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) {
 return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
 }
 
+/**
+ * @return An immutable map containing all the topic partitions
+ * with their current member assignments.
+ */
+public Map> partitionAssignments() {
+return Collections.unmodifiableMap(partitionAssignments);
+}
+
 /**
  * Updates target assignment of a member.
  *
  * @param memberId  The member id.
  * @param newTargetAssignment   The new target assignment.
  */
 public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+updatePartitionAssignments(
+memberId,
+targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+newTargetAssignment
+);
 targetAssignment.put(memberId, newTargetAssignment);
 }
 
+/**
+ * Updates partition assignments of the topics.
+ *
+ * @param memberId  The member Id.
+ * @param oldTargetAssignment   The old target assignment.
+ * @param newTargetAssignment   The new target assignment.
+ *
+ * Package private for testing.
+ */
+void updatePartitionAssignments(

Review Comment:
   used it in tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-05-21 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848351#comment-17848351
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16586:


Failed again:

 
{code:java}
randomClusterPerturbationsShouldConverge[rackAwareStrategy=balance_subtopology] 
– 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest

Stacktracejava.lang.AssertionError: Assertion failed in randomized test. 
Reproduce with: `runRandomizedScenario(-3595932977400264775)`. {code}
 

> Test TaskAssignorConvergenceTest failing
> 
>
> Key: KAFKA-16586
> URL: https://issues.apache.org/jira/browse/KAFKA-16586
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>
> {code:java}
> java.lang.AssertionError: Assertion failed in randomized test. Reproduce 
> with: `runRandomizedScenario(-538095696758490522)`.  at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
>   at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
> This might expose an actual bug (or incorrect test setup) and should be 
> reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14517) Implement regex subscriptions

2024-05-21 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848348#comment-17848348
 ] 

Lianet Magrans commented on KAFKA-14517:


Sure, I'll assign it to me then. Thanks [~phuctran] !

> Implement regex subscriptions
> -
>
> Key: KAFKA-14517
> URL: https://issues.apache.org/jira/browse/KAFKA-14517
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14517) Implement regex subscriptions

2024-05-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-14517:
--

Assignee: Lianet Magrans  (was: Phuc Hong Tran)

> Implement regex subscriptions
> -
>
> Key: KAFKA-14517
> URL: https://issues.apache.org/jira/browse/KAFKA-14517
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608834195


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
-emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-  else {
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) {
+// We need to be careful before reading the segment as 
`maxOffsetMetadata` may not be a complete metadata:
+// 1. If maxOffsetMetadata is message-offset-only, then return empty 
fetchDataInfo since
+// maxOffsetMetadata.offset is not on local log segments.
+// 2. If maxOffsetMetadata.segmentBaseOffset is smaller than 
segment.baseOffset, then return empty fetchDataInfo.

Review Comment:
   This comment is pending. Will wait for @showuon review before applying the 
change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608831211


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) {

Review Comment:
   When applying this change, the newly added 
LocalLog#testWhenFetchOffsetHigherThanMaxOffset test fails for the case-3 and 
case-4. (ie) Which offset to return back in the FetchDataInfo when the 
conditions didn't met?
   
   1. In current approach, the same fetch-offset gets returned back in the 
FetchDataInfo
   2. In the suggested approach, the next-offset/log-end-offset gets returned 
back in the FetchDataInfo
   
   I'm not sure which one is correct. Please suggest. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608831211


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) {

Review Comment:
   When applying this change, the newly added 
LocalLog#testWhenFetchOffsetHigherThanMaxOffset test fails for the case-3 and 
case-4. (ie) Which offset to return back in the FetchDataInfo when the 
conditions didn't met?
   
   1. In current approach, the same fetch-offset gets returned back in the 
FetchDataInfo
   2. In the suggested approach, the next-offset/log-end-offset gets returned 
back in the FetchDataInfo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16626: Lazily convert subscribed topic names to topic ids [kafka]

2024-05-21 Thread via GitHub


dajac commented on code in PR #15970:
URL: https://github.com/apache/kafka/pull/15970#discussion_r1608815939


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java:
##
@@ -53,6 +55,13 @@ public class OptimizedUniformAssignmentBuilderTest {
 private final String memberB = "B";
 private final String memberC = "C";
 
+private final TopicsImage topicsImage = new MetadataImageBuilder()

Review Comment:
   Do we still need this one?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicIds.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * TopicIds is initialized with topic names (String) but exposes a Set of 
topic ids (Uuid) to the
+ * user and performs the conversion lazily with TopicsImage.
+ */
+public class TopicIds implements Set {
+private final Set topicNames;
+private final TopicsImage image;
+
+public TopicIds(
+Set topicNames,
+TopicsImage image
+) {
+this.topicNames = Objects.requireNonNull(topicNames);
+this.image = Objects.requireNonNull(image);
+}
+
+@Override
+public int size() {
+return topicNames.size();
+}
+
+@Override
+public boolean isEmpty() {
+return topicNames.isEmpty();
+}
+
+@Override
+public boolean contains(Object o) {
+if (o instanceof Uuid) {
+Uuid topicId = (Uuid) o;
+TopicImage topicImage = image.getTopic(topicId);
+if (topicImage == null) return false;
+return topicNames.contains(topicImage.name());
+}
+return false;
+}
+
+private static class TopicIdIterator implements Iterator {
+final Iterator iterator;
+final TopicsImage image;
+private Uuid next = null;
+
+private TopicIdIterator(
+Iterator iterator,
+TopicsImage image
+) {
+this.iterator = Objects.requireNonNull(iterator);
+this.image = Objects.requireNonNull(image);
+}
+
+@Override
+public boolean hasNext() {
+if (next != null) return true;
+Uuid result = null;
+do {
+if (!iterator.hasNext()) {
+return false;
+}
+String next = iterator.next();
+TopicImage topicImage = image.getTopic(next);
+if (topicImage != null) {
+result = topicImage.id();
+}
+} while (result == null);
+next = result;
+return true;
+}
+
+@Override
+public Uuid next() {
+if (!hasNext()) throw new NoSuchElementException();
+Uuid result = next;
+next = null;
+return result;
+}
+}
+
+@Override
+public Iterator iterator() {
+return new TopicIdIterator(topicNames.iterator(), image);
+}
+
+@Override
+public Object[] toArray() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public  T[] toArray(T[] a) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean add(Uuid o) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean remove(Object o) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean addAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void clear() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean removeAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean 

Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608823231


##
storage/src/test/java/org/apache/kafka/tiered/storage/actions/EraseBrokerStorageAction.java:
##
@@ -19,24 +19,35 @@
 import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.PrintStream;
 
 public final class EraseBrokerStorageAction implements TieredStorageTestAction 
{
 
 private final int brokerId;
+private final FilenameFilter filenameFilter;
+private final boolean isStopped;
 
 public EraseBrokerStorageAction(int brokerId) {
+this(brokerId, (dir, name) -> true, false);
+}
+
+public EraseBrokerStorageAction(int brokerId,
+FilenameFilter filenameFilter,
+boolean isStopped) {
 this.brokerId = brokerId;
+this.filenameFilter = filenameFilter;
+this.isStopped = isStopped;
 }
 
 @Override
 public void doExecute(TieredStorageTestContext context) throws IOException 
{
-context.eraseBrokerStorage(brokerId);
+context.eraseBrokerStorage(brokerId, filenameFilter, isStopped);
 }
 
 @Override
 public void describe(PrintStream output) {
-output.println("erase-broker-storage: " + brokerId);
+output.println("erase-broker-storage: " + brokerId + ", isStopped: " + 
isStopped);

Review Comment:
   `filenameFilter` is a lambda expression, it results to the object name in 
the log so omitted it from the output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16804: Replace archivesBaseName with archivesName. [kafka]

2024-05-21 Thread via GitHub


gharris1727 commented on PR #16016:
URL: https://github.com/apache/kafka/pull/16016#issuecomment-2123252147

   I verified that this change resolves the relevant build warnings, and 
produces an identical published result as the current trunk implementation.
   
   Thank you @frankvicky for picking this up!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16804: Replace archivesBaseName with archivesName. [kafka]

2024-05-21 Thread via GitHub


gharris1727 commented on code in PR #16016:
URL: https://github.com/apache/kafka/pull/16016#discussion_r1608820947


##
build.gradle:
##
@@ -341,7 +341,7 @@ subprojects {
 artifact task
 }
 
-artifactId = archivesBaseName
+artifactId = 
project.extensions.findByType(BasePluginExtension)?.archivesName?.get()

Review Comment:
   I think this has the same effect, and looks a bit better. The "safe 
navigation operator" `?` also makes me think that this could be null, when it 
really should never be null, and the publishing plugin can't handle a null.
   ```suggestion
   artifactId = base.archivesName.get()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15242.
-
  Assignee: (was: Alexander Aghili)
Resolution: Duplicate

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15258) Consider moving MockAdminClient to the public API

2024-05-21 Thread Muralidhar Basani (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Muralidhar Basani reassigned KAFKA-15258:
-

Assignee: Muralidhar Basani

> Consider moving MockAdminClient to the public API
> -
>
> Key: KAFKA-15258
> URL: https://issues.apache.org/jira/browse/KAFKA-15258
> Project: Kafka
>  Issue Type: Task
>  Components: admin
>Reporter: Mickael Maison
>Assignee: Muralidhar Basani
>Priority: Major
>  Labels: need-kip
>
> MockConsumer and MockProducer are part of the public API. They are useful for 
> developers wanting to test their applications. On the other hand 
> MockAdminClient is not part of the public API (it's under test). We should 
> consider moving it to src so users can also easily test applications that 
> depend on Admin.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Move Throttler to storage module [kafka]

2024-05-21 Thread via GitHub


chia7712 commented on code in PR #16023:
URL: https://github.com/apache/kafka/pull/16023#discussion_r1608736058


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -109,12 +110,12 @@ class LogCleaner(initialConfig: CleanerConfig,
   private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, 
logDirFailureChannel)
 
   /* a throttle used to limit the I/O of all the cleaner threads to a 
user-specified maximum rate */
-  private[log] val throttler = new Throttler(desiredRatePerSec = 
config.maxIoBytesPerSecond,
-checkIntervalMs = 300,
-throttleDown = true,
-"cleaner-io",
-"bytes",
-time = time)
+  private[log] val throttler = new Throttler(config.maxIoBytesPerSecond,
+ 300,
+ true,

Review Comment:
   It seems `throttleDown` is always true in production. Maybe we can remove 
it? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]

2024-05-21 Thread via GitHub


apourchet opened a new pull request, #16024:
URL: https://github.com/apache/kafka/pull/16024

   For task assignment purposes, the user needs to have a set of information 
available for each topic partition affecting the desired tasks.
   
   This PR introduces a new interface for a read-only container class that 
allows all the important and relevant information to be found in one place.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-21 Thread via GitHub


kirktrue commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2123163240

   @lianetm @cadonna—The latest batch of feedback has been addressed. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-21 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608732398


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1848,6 +1821,40 @@ private void subscribeInternal(Collection 
topics, Optional 
processor) {

Review Comment:
   Done. That's much better 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-21 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608730407


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * {@code CompletableEventReaper} is responsible for tracking {@link 
CompletableEvent time-bound events} and removing
+ * any that exceed their {@link CompletableEvent#deadlineMs() deadline} 
(unless they've already completed). This
+ * mechanism is used by the {@link AsyncKafkaConsumer} to enforce the timeout 
provided by the user in its API
+ * calls (e.g. {@link AsyncKafkaConsumer#commitSync(Duration)}).
+ */
+public class CompletableEventReaper {
+
+private final Logger log;
+
+/**
+ * List of tracked events that are candidates for expiration.
+ */
+private final List> tracked;
+
+public CompletableEventReaper(LogContext logContext) {
+this.log = logContext.logger(CompletableEventReaper.class);
+this.tracked = new ArrayList<>();
+}
+
+/**
+ * Adds a new {@link CompletableEvent event} to track for later 
completion/expiration.
+ *
+ * @param event Event to track
+ */
+public void add(CompletableEvent event) {
+tracked.add(Objects.requireNonNull(event, "Event to track must be 
non-null"));
+}
+
+/**
+ * This method performs a two-step process to "complete" {@link 
CompletableEvent events} that have either expired
+ * or completed normally:
+ *
+ * 
+ * 
+ * For each tracked event which has exceeded its {@link 
CompletableEvent#deadlineMs() deadline}, an
+ * instance of {@link TimeoutException} is created and passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * For each tracked event of which its {@link 
CompletableEvent#future() future} is already in the
+ * {@link CompletableFuture#isDone() done} state, it will be 
removed from the list of tracked events.
+ * 
+ * 
+ *
+ * 
+ *
+ * This method should be called at regular intervals, based upon the needs 
of the resource that owns the reaper.
+ *
+ * @param currentTimeMs Current time with which to compare 
against the
+ *  {@link CompletableEvent#deadlineMs() 
expiration time}
+ */
+public void reap(long currentTimeMs) {
+Consumer> expireEvent = event -> {
+long pastDueMs = currentTimeMs - event.deadlineMs();
+TimeoutException error = new TimeoutException(String.format("%s 
was %s ms past its expiration of %s", event.getClass().getSimpleName(), 
pastDueMs, event.deadlineMs()));
+
+if (event.future().completeExceptionally(error)) {
+log.debug("Event {} completed exceptionally since its 
expiration of {} passed {} ms ago", event, event.deadlineMs(), pastDueMs);
+} else {
+log.trace("Event {} not completed exceptionally since it was 
previously completed", event);
+}
+};
+
+// First, complete (exceptionally) any events that have passed their 
deadline AND aren't already complete.
+tracked.stream()
+.filter(e -> !e.future().isDone())
+.filter(e -> currentTimeMs > e.deadlineMs())

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact 

Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1607381004


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,14 @@ class DelayedFetch(
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
 if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
+// If we don't know the position of the offset on log 
segments, just pessimistically assume that we
+// only gained 1 byte when fetchOffset < endOffset, otherwise 
do nothing. This can happen when the
+// high-watermark is stale, but should be rare.
+if (fetchOffset.messageOffset < endOffset.messageOffset) {

Review Comment:
   We can organize the code a bit clearer. I am thinking of sth like the 
following.
   
   ```
   if (fetchOffset.messageOffset > endOffset.messageOffset) {
   // Case F, this can happen when the new fetch operation is 
on a truncated leader
   debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
   return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
 if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
   // If we don't know the position of the offset on log 
segments, just pessimistically assume that we
   // only gained 1 byte when fetchOffset < endOffset, 
otherwise do nothing. This can happen when the
   // high-watermark is stale, but should be rare.
 accumulatedSize += 1
 } else if (fetchOffset.onOlderSegment(endOffset)) {
   // Case F, this can happen when the fetch operation is 
falling behind the current segment
   // or the partition has just rolled a new segment
   debug(s"Satisfying fetch $this immediately since it is 
fetching older segments.")
   // We will not force complete the fetch request if a replica 
should be throttled.
   if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
 return forceComplete()
 } else if (fetchOffset.onSameSegment(endOffset)) {
   // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
   val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
   if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
 accumulatedSize += bytesAvailable
 }
   
   ```



##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithInvalidHighWatermark(endOffset: Long): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// Note that the high-watermark does not contains the complete metadata

Review Comment:
   does not contains => does not contain




  1   2   3   >