Re: [PR] Kafka-15444: Native docker image for Apache Kafka(KIP-974) [kafka]
omkreddy merged PR #15927: URL: https://github.com/apache/kafka/pull/15927 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Aesthetic, Uniformity Changes and Reducing warnings [kafka]
rreddy-22 opened a new pull request, #16026: URL: https://github.com/apache/kafka/pull/16026 Went through all the files in the group coordinator module and fixed up a few minor warnings and cleaned up a few leftover errors. Also fixed the formatting to make it more uniform -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1609228927 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) + // Add future replica log to partition's map if it's not existed Review Comment: > However, adding alter thread (replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)) is not in this check. Is it possible that alter thread, which is invoked by another thread, just remove the future log and then this thread add the topic partition to replicaAlterLogDirsManager? It seems to me that alter thread will get fail as future log of partition is gone. That's possible. But I think that's fine because the removal of future log could because: 1. alter logDir completes. In this case, the new leaderAndIsr request or topic partition update will updated and this fetcher will be removed then in `ReplicaManager#makeLeader or makeFollower`. 2. Another log failure happened. In this case the createLogIfInexsted will fail, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2123824613 @junrao @showuon Thanks for the review! Addressed all the review comments. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609227442 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) -emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) - else { + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) { +// We need to be careful before reading the segment as `maxOffsetMetadata` may not be a complete metadata: +// 1. If maxOffsetMetadata is message-offset-only, then return empty fetchDataInfo since +// maxOffsetMetadata.offset is not on local log segments. +// 2. If maxOffsetMetadata.segmentBaseOffset is smaller than segment.baseOffset, then return empty fetchDataInfo. Review Comment: ack, kept the same behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609227180 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) { Review Comment: Addressed with the latest commit 77f90f63408ebe5b3da5c3305ad2affb7901eff4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16662) UnwritableMetadataException: Metadata has been lost
[ https://issues.apache.org/jira/browse/KAFKA-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848436#comment-17848436 ] Jianbin Chen commented on KAFKA-16662: -- After I deleted all of the __cluster_metadata-0, the problem did not occur when I started the cluster, but all my topic information was lost. Fortunately, this is just an offline test environment cluster. According to the phenomenon, it is certain that the incompatibility between the 3.5 version of metadata and the 3.7 version caused this problem. This makes me dare not try to smoothly upgrade the cluster. In the past, when using zk, upgrading the broker would never cause similar problems! > UnwritableMetadataException: Metadata has been lost > --- > > Key: KAFKA-16662 > URL: https://issues.apache.org/jira/browse/KAFKA-16662 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 > Environment: Docker Image (bitnami/kafka:3.7.0) > via Docker Compose >Reporter: Tobias Bohn >Priority: Major > Attachments: log.txt > > > Hello, > First of all: I am new to this Jira and apologize if anything is set or > specified incorrectly. Feel free to advise me. > We currently have an error in our test system, which unfortunately I can't > solve, because I couldn't find anything related to it. No solution could be > found via the mailing list either. > The error occurs when we want to start up a node. The node runs using Kraft > and is both a controller and a broker. The following error message appears at > startup: > {code:java} > kafka | [2024-04-16 06:18:13,707] ERROR Encountered fatal fault: Unhandled > error initializing new publishers > (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler) > kafka | org.apache.kafka.image.writer.UnwritableMetadataException: Metadata > has been lost because the following could not be represented in metadata > version 3.5-IV2: the directory assignment state of one or more replicas > kafka | at > org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) > kafka | at > org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) > kafka | at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) > kafka | at > org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84) > kafka | at > org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155) > kafka | at > org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295) > kafka | at > org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266) > kafka | at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > kafka | at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > kafka | at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > kafka | at java.base/java.lang.Thread.run(Thread.java:840) > kafka exited with code 0 {code} > We use Docker to operate the cluster. The error occurred while we were trying > to restart a node. All other nodes in the cluster are still running correctly. > If you need further information, please let us know. The complete log is > attached to this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16783: Migrate RemoteLogMetadataManagerTest to new test infra [kafka]
chia7712 commented on PR #15983: URL: https://github.com/apache/kafka/pull/15983#issuecomment-2123808122 > I think we can file another PR to improve it, either we add a future.get(30 sec), or we set a global @timeout tag on the test class to limit the waiting time. I love the idea about global timeout. I will file a jira for it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16662) UnwritableMetadataException: Metadata has been lost
[ https://issues.apache.org/jira/browse/KAFKA-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848434#comment-17848434 ] Jianbin Chen commented on KAFKA-16662: -- When I executed ./bin/kafka-features.sh --bootstrap-server 10.58.16.231:9092 upgrade --metadata 3.7, It continuously outputs the following exception {panel:title=我的标题} [2024-05-22 11:30:36,491] INFO [UnifiedLog partition=remote-test-5, dir=/data01/kafka-logs-351] Incremented log start offset to 26267689 due to leader offset increment (kafka.log.UnifiedLog) [2024-05-22 11:30:36,497] INFO [UnifiedLog partition=remote-test2-0, dir=/data01/kafka-logs-351] Incremented log start offset to 3099360 due to leader offset increment (kafka.log.UnifiedLog) [2024-05-22 11:30:37,149] ERROR Failed to propagate directory assignments because the Controller returned error STALE_BROKER_EPOCH (org.apache.kafka.server.AssignmentsManager) [2024-05-22 11:30:38,064] ERROR Failed to propagate directory assignments because the Controller returned error STALE_BROKER_EPOCH (org.apache.kafka.server.AssignmentsManager) [2024-05-22 11:30:39,376] ERROR Failed to propagate directory assignments because the Controller returned error STALE_BROKER_EPOCH (org.apache.kafka.server.AssignmentsManager) [2024-05-22 11:30:41,486] ERROR Failed to propagate directory assignments because the Controller returned error STALE_BROKER_EPOCH (org.apache.kafka.server.AssignmentsManager) [2024-05-22 11:30:43,794] INFO [BrokerLifecycleManager id=3] Unable to register broker 3 because the controller returned error INVALID_REGISTRATION (kafka.server.BrokerLifecycleManager) [2024-05-22 11:30:45,224] ERROR Failed to propagate directory assignments because the Controller returned error STALE_BROKER_EPOCH (org.apache.kafka.server.AssignmentsManager) {panel} controller logs: {code:java} java.util.concurrent.CompletionException: org.apache.kafka.common.errors.StaleBrokerEpochException: Expected broker epoch 41885255, but got broker epoch -1 at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:880) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871) at org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:148) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:137) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: org.apache.kafka.common.errors.StaleBrokerEpochException: Expected broker epoch 41885255, but got broker epoch -1{code} > UnwritableMetadataException: Metadata has been lost > --- > > Key: KAFKA-16662 > URL: https://issues.apache.org/jira/browse/KAFKA-16662 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 > Environment: Docker Image (bitnami/kafka:3.7.0) > via Docker Compose >Reporter: Tobias Bohn >Priority: Major > Attachments: log.txt > > > Hello, > First of all: I am new to this Jira and apologize if anything is set or > specified incorrectly. Feel free to advise me. > We currently have an error in our test system, which unfortunately I can't > solve, because I couldn't find anything related to it. No solution could be > found via the mailing list either. > The error occurs when we want to start up a node. The node runs using Kraft > and is both a controller and a broker. The following error message appears at > startup: > {code:java} > kafka | [2024-04-16 06:18:13,707] ERROR Encountered fatal fault: Unhandled > error initializing new publishers > (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler) > kafka | org.apache.kafka.image.writer.UnwritableMetadataException: Metadata > has been lost because the following could not be represented in metadata > version 3.5-IV2: the directory assignment state of one or more replicas > kafka | at > org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) > kafka | at >
[jira] [Resolved] (KAFKA-16783) Migrate RemoteLogMetadataManagerTest to new test infra
[ https://issues.apache.org/jira/browse/KAFKA-16783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16783. --- Fix Version/s: 3.8.0 Resolution: Fixed > Migrate RemoteLogMetadataManagerTest to new test infra > -- > > Key: KAFKA-16783 > URL: https://issues.apache.org/jira/browse/KAFKA-16783 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > Labels: storage_test > Fix For: 3.8.0 > > > as title > `TopicBasedRemoteLogMetadataManagerWrapperWithHarness` could be replaced by > `RemoteLogMetadataManagerTestUtils#builder` -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16783: Migrate RemoteLogMetadataManagerTest to new test infra [kafka]
showuon merged PR #15983: URL: https://github.com/apache/kafka/pull/15983 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16662) UnwritableMetadataException: Metadata has been lost
[ https://issues.apache.org/jira/browse/KAFKA-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848431#comment-17848431 ] Jianbin Chen commented on KAFKA-16662: -- Could someone please pay attention to this issue and help me out? {code:java} [admin@kafka-dev-d-010058016231 kafka]$ ./bin/kafka-features.sh --bootstrap-server 10.58.16.231:9092 describe Feature: metadata.version SupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.5-IV2 Epoch: 41885646{code} > UnwritableMetadataException: Metadata has been lost > --- > > Key: KAFKA-16662 > URL: https://issues.apache.org/jira/browse/KAFKA-16662 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 > Environment: Docker Image (bitnami/kafka:3.7.0) > via Docker Compose >Reporter: Tobias Bohn >Priority: Major > Attachments: log.txt > > > Hello, > First of all: I am new to this Jira and apologize if anything is set or > specified incorrectly. Feel free to advise me. > We currently have an error in our test system, which unfortunately I can't > solve, because I couldn't find anything related to it. No solution could be > found via the mailing list either. > The error occurs when we want to start up a node. The node runs using Kraft > and is both a controller and a broker. The following error message appears at > startup: > {code:java} > kafka | [2024-04-16 06:18:13,707] ERROR Encountered fatal fault: Unhandled > error initializing new publishers > (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler) > kafka | org.apache.kafka.image.writer.UnwritableMetadataException: Metadata > has been lost because the following could not be represented in metadata > version 3.5-IV2: the directory assignment state of one or more replicas > kafka | at > org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) > kafka | at > org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) > kafka | at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) > kafka | at > org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84) > kafka | at > org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155) > kafka | at > org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295) > kafka | at > org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266) > kafka | at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > kafka | at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > kafka | at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > kafka | at java.base/java.lang.Thread.run(Thread.java:840) > kafka exited with code 0 {code} > We use Docker to operate the cluster. The error occurred while we were trying > to restart a node. All other nodes in the cluster are still running correctly. > If you need further information, please let us know. The complete log is > attached to this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
artemlivshits commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1609158284 ## core/src/main/scala/kafka/server/BrokerFeatures.scala: ## @@ -75,16 +75,19 @@ object BrokerFeatures extends Logging { } def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = { -Features.supportedFeatures( - java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME, +val features = new util.HashMap[String, SupportedVersionRange]() + features.put(MetadataVersion.FEATURE_NAME, new SupportedVersionRange( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), if (unstableMetadataVersionsEnabled) { MetadataVersion.latestTesting.featureLevel } else { MetadataVersion.latestProduction.featureLevel - } -))) + })) +org.apache.kafka.server.common.Features.PRODUCTION_FEATURES.forEach { feature => Review Comment: Any reasons this is a fully qualified class name and not just imported as it's usually done? ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -16,72 +16,135 @@ */ package org.apache.kafka.server.common; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.stream.Collectors; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum Features { + +/** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. + */ +TEST_VERSION("test.feature.version", TestFeatureVersion.values(), TestFeatureVersion::fromFeatureLevel, false); -public final class Features { -private final MetadataVersion version; -private final Map finalizedFeatures; -private final long finalizedFeaturesEpoch; +public static final Features[] FEATURES; +public static final List PRODUCTION_FEATURES; +private final String name; +private final FeatureVersion[] features; +private final CreateMethod createFeatureVersionMethod; +private final boolean usedInProduction; -public static Features fromKRaftVersion(MetadataVersion version) { -return new Features(version, Collections.emptyMap(), -1, true); +Features(String name, + FeatureVersion[] features, + CreateMethod createMethod, + boolean usedInProduction) { +this.name = name; +this.features = features; +this.createFeatureVersionMethod = createMethod; +this.usedInProduction = usedInProduction; } -public Features( -MetadataVersion version, -Map finalizedFeatures, -long finalizedFeaturesEpoch, -boolean kraftMode -) { -this.version = version; -this.finalizedFeatures = new HashMap<>(finalizedFeatures); -this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; -// In KRaft mode, we always include the metadata version in the features map. -// In ZK mode, we never include it. -if (kraftMode) { -this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel()); -} else { -this.finalizedFeatures.remove(FEATURE_NAME); -} +static { +Features[] enumValues = Features.values(); +FEATURES = Arrays.copyOf(enumValues, enumValues.length); + +PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> +feature.usedInProduction).collect(Collectors.toList()); } -public MetadataVersion metadataVersion() { -return version; +public String featureName() { +return name; } -public Map finalizedFeatures() { -return finalizedFeatures; +public FeatureVersion[] features() { +return features; } -public long finalizedFeaturesEpoch() { -return finalizedFeaturesEpoch; +/** + * Creates a FeatureVersion
Re: [PR] KAFKA-16783: Migrate RemoteLogMetadataManagerTest to new test infra [kafka]
showuon commented on PR #15983: URL: https://github.com/apache/kafka/pull/15983#issuecomment-2123794220 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16662) UnwritableMetadataException: Metadata has been lost
[ https://issues.apache.org/jira/browse/KAFKA-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848424#comment-17848424 ] Jianbin Chen edited comment on KAFKA-16662 at 5/22/24 3:10 AM: --- I have encountered the same issue. Can anyone help me with this? I upgraded from 3.5.1 to 3.7.0, and I have already changed inter.broker.protocol.version to 3.7 and ran it for some time. But I have never executed {code:java} ./bin/kafka-features.sh upgrade --metadata 3.7 {code} The last time I restarted the cluster, I found that it could not be started anymore. The last line of the log is as follows: {code:java} [2024-05-22 11:01:41,087] INFO [MetadataLoader id=3] maybePublishMetadata(LOG_DELTA): The loader is still catching up because we have loaded up to offset 41872530, but the high water mark is 41872532 (org.apache.kafka.image.loader.MetadataLoader) [2024-05-22 11:01:41,088] INFO [MetadataLoader id=3] maybePublishMetadata(LOG_DELTA): The loader finished catching up to the current high water mark of 41872532 (org.apache.kafka.image.loader.MetadataLoader) [2024-05-22 11:01:41,092] INFO [BrokerLifecycleManager id=3] The broker has caught up. Transitioning from STARTING to RECOVERY. (kafka.server.BrokerLifecycleManager) [2024-05-22 11:01:41,092] INFO [BrokerServer id=3] Finished waiting for the controller to acknowledge that we are caught up (kafka.server.BrokerServer) [2024-05-22 11:01:41,092] INFO [BrokerServer id=3] Waiting for the initial broker metadata update to be published (kafka.server.BrokerServer) [2024-05-22 11:01:41,095] ERROR Encountered fatal fault: Unhandled error initializing new publishers (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler) org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been lost because the following could not be represented in metadata version 3.5-IV2: the directory assignment state of one or more replicas at org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) at org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84) at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155) at org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295) at org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.base/java.lang.Thread.run(Thread.java:1583) {code} was (Author: jianbin): I have encountered the same issue. Can anyone help me with this? I upgraded from 3.5.1 to 3.7.0, and I have already changed inter.broker.protocol.version to 3.7 and ran it for some time. But I have never executed `./bin/kafka-features.sh upgrade --metadata 3.7` The last time I restarted the cluster, I found that it could not be started anymore. The last line of the log is as follows: ``` [2024-05-22 11:01:41,087] INFO [MetadataLoader id=3] maybePublishMetadata(LOG_DELTA): The loader is still catching up because we have loaded up to offset 41872530, but the high water mark is 41872532 (org.apache.kafka.image.loader.MetadataLoader) [2024-05-22 11:01:41,088] INFO [MetadataLoader id=3] maybePublishMetadata(LOG_DELTA): The loader finished catching up to the current high water mark of 41872532 (org.apache.kafka.image.loader.MetadataLoader) [2024-05-22 11:01:41,092] INFO [BrokerLifecycleManager id=3] The broker has caught up. Transitioning from STARTING to RECOVERY. (kafka.server.BrokerLifecycleManager) [2024-05-22 11:01:41,092] INFO [BrokerServer id=3] Finished waiting for the controller to acknowledge that we are caught up (kafka.server.BrokerServer) [2024-05-22 11:01:41,092] INFO [BrokerServer id=3] Waiting for the initial broker metadata update to be published (kafka.server.BrokerServer) [2024-05-22 11:01:41,095] ERROR Encountered fatal fault: Unhandled error initializing new publishers (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler) org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been lost because the following could not be represented in metadata version 3.5-IV2: the directory assignment state of one or more replicas at org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) at org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) at
Re: [PR] KAFKA-16771 First log directory printed twice when formatting storage [kafka]
gongxuanzhang commented on code in PR #16010: URL: https://github.com/apache/kafka/pull/16010#discussion_r1609198047 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -452,19 +452,20 @@ object StorageTool extends Logging { stream.println("All of the log directories are already formatted.") } else { metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => { -copier.setLogDirProps(logDir, new MetaProperties.Builder(metaProperties). - setDirectoryId(copier.generateValidDirectoryId()). +val loggingCopier = new MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble) +loggingCopier.setLogDirProps(logDir, new MetaProperties.Builder(metaProperties). + setDirectoryId(loggingCopier.generateValidDirectoryId()). build()) -copier.setPreWriteHandler((logDir, _, _) => { +loggingCopier.setPreWriteHandler((logDir, _, _) => { stream.println(s"Formatting $logDir with metadata.version $metadataVersion.") Files.createDirectories(Paths.get(logDir)) val bootstrapDirectory = new BootstrapDirectory(logDir, Optional.empty()) bootstrapDirectory.writeBinaryFile(bootstrapMetadata) }) -copier.setWriteErrorHandler((logDir, e) => { +loggingCopier.setWriteErrorHandler((logDir, e) => { throw new TerseFailure(s"Error while writing meta.properties file $logDir: ${e.getMessage}") }) -copier.writeLogDirChanges() Review Comment: If just move this line out of loop,print in reverse order. The test cases have been added. please review @chia7712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16662) UnwritableMetadataException: Metadata has been lost
[ https://issues.apache.org/jira/browse/KAFKA-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848424#comment-17848424 ] Jianbin Chen commented on KAFKA-16662: -- I have encountered the same issue. Can anyone help me with this? I upgraded from 3.5.1 to 3.7.0, and I have already changed inter.broker.protocol.version to 3.7 and ran it for some time. But I have never executed `./bin/kafka-features.sh upgrade --metadata 3.7` The last time I restarted the cluster, I found that it could not be started anymore. The last line of the log is as follows: ``` [2024-05-22 11:01:41,087] INFO [MetadataLoader id=3] maybePublishMetadata(LOG_DELTA): The loader is still catching up because we have loaded up to offset 41872530, but the high water mark is 41872532 (org.apache.kafka.image.loader.MetadataLoader) [2024-05-22 11:01:41,088] INFO [MetadataLoader id=3] maybePublishMetadata(LOG_DELTA): The loader finished catching up to the current high water mark of 41872532 (org.apache.kafka.image.loader.MetadataLoader) [2024-05-22 11:01:41,092] INFO [BrokerLifecycleManager id=3] The broker has caught up. Transitioning from STARTING to RECOVERY. (kafka.server.BrokerLifecycleManager) [2024-05-22 11:01:41,092] INFO [BrokerServer id=3] Finished waiting for the controller to acknowledge that we are caught up (kafka.server.BrokerServer) [2024-05-22 11:01:41,092] INFO [BrokerServer id=3] Waiting for the initial broker metadata update to be published (kafka.server.BrokerServer) [2024-05-22 11:01:41,095] ERROR Encountered fatal fault: Unhandled error initializing new publishers (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler) org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been lost because the following could not be represented in metadata version 3.5-IV2: the directory assignment state of one or more replicas at org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) at org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84) at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155) at org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295) at org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.base/java.lang.Thread.run(Thread.java:1583) ``` > UnwritableMetadataException: Metadata has been lost > --- > > Key: KAFKA-16662 > URL: https://issues.apache.org/jira/browse/KAFKA-16662 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 > Environment: Docker Image (bitnami/kafka:3.7.0) > via Docker Compose >Reporter: Tobias Bohn >Priority: Major > Attachments: log.txt > > > Hello, > First of all: I am new to this Jira and apologize if anything is set or > specified incorrectly. Feel free to advise me. > We currently have an error in our test system, which unfortunately I can't > solve, because I couldn't find anything related to it. No solution could be > found via the mailing list either. > The error occurs when we want to start up a node. The node runs using Kraft > and is both a controller and a broker. The following error message appears at > startup: > {code:java} > kafka | [2024-04-16 06:18:13,707] ERROR Encountered fatal fault: Unhandled > error initializing new publishers > (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler) > kafka | org.apache.kafka.image.writer.UnwritableMetadataException: Metadata > has been lost because the following could not be represented in metadata > version 3.5-IV2: the directory assignment state of one or more replicas > kafka | at > org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) > kafka | at > org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) > kafka | at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) > kafka | at > org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84) > kafka | at > org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155) > kafka | at > org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295) > kafka | at >
Re: [PR] KAFKA-16669: Remove extra collection copy when generating DescribeAclsResource [kafka]
showuon commented on code in PR #15924: URL: https://github.com/apache/kafka/pull/15924#discussion_r1609187500 ## core/src/main/scala/kafka/server/AclApis.scala: ## @@ -69,7 +69,7 @@ class AclApis(authHelper: AuthHelper, case Some(auth) => val filter = describeAclsRequest.filter val returnedAcls = new util.HashSet[AclBinding]() -auth.acls(filter).forEach(returnedAcls.add) +returnedAcls.add(auth.acls(filter).iterator().next()) Review Comment: OK, I know why you did this change now. > we don't need to collect all items to a new collection, right? I believe what @chia7712 meant is we don't have to do a collection copy here because we already allow passing `Iterable` into `aclsResource` method. To make it clear, in the JIRA, we said: > 1. Iterable -> HashSet This is the collection copy we want to avoid here. So we don't need `returnedAcls` anymore, we can pass the result of `auth.acls(filter)` into `aclsResource` directly. Is that clear? ## core/src/main/scala/kafka/server/AclApis.scala: ## @@ -69,7 +69,7 @@ class AclApis(authHelper: AuthHelper, case Some(auth) => val filter = describeAclsRequest.filter val returnedAcls = new util.HashSet[AclBinding]() -auth.acls(filter).forEach(returnedAcls.add) +returnedAcls.add(auth.acls(filter).iterator().next()) Review Comment: Originally we will add all `auth.acls(filter)` content into `returnedAcls`, but after this change, we only add the first element. Why do we need this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // [Luke] We will check if startOffset > nextOffsetMetadata.messageOffset (endOffset), // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, // the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // [Luke] So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, // it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop, because this time, `if (startOffset > maxOffsetMetadata.messageOffset)` == `if (startOffset > endOffset)` If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // [Luke] We will check if startOffset > nextOffsetMetadata.messageOffset (endOffset), // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, // the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // [Luke] So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, // it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop, because `if (startOffset > maxOffsetMetadata.messageOffset)` == `if (startOffset > endOffset)` If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // [Luke] We will check if startOffset > nextOffsetMetadata.messageOffset (endOffset), // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, // the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // [Luke] So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, // it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop. If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // [Luke] We will check if startOffset > nextOffsetMetadata.messageOffset, // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, // the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // [Luke] So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, // it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop. If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // [Luke] We will check if startOffset > nextOffsetMetadata.messageOffset, so if the provided `maxOffsetMetadata == nextOffsetMetadata`, the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // [Luke] So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop. If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // (1) We will check if startOffset > nextOffsetMetadata.messageOffset, so if the provided `maxOffsetMetadata == nextOffsetMetadata`, the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop. If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16795: Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter [kafka]
brandboat commented on PR #16020: URL: https://github.com/apache/kafka/pull/16020#issuecomment-2123611516 > The benefit of this approach is that we don't need to add removed class back. WDYT? This one is way better, thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16798) Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
[ https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848404#comment-17848404 ] Greg Harris commented on KAFKA-16798: - Hi [~sektor.coder] thanks for the ticket! The log message that you're looking at is in WorkerSourceTask: [https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L235] It should be printed every offset.flush.interval.ms: [https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L75-L78] That periodic operation is only for writing the task "source offsets", which are used to store the replication progress. It is completely separate from the consumer group sync feature that it sounds like you're trying to configure. The analogous log message for that is this one in the MirrorCheckpointTask: [https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L387] If you're not seeing that log message, you might look at the OffsetSyncStore: [https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L113-L154] and in MirrorCheckpointTask: [https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L197-L224] for reasons why the offset translation/syncing might not be happening as you expect. > Mirrormaker2 dedicated mode - sync.group.offsets.interval not working > - > > Key: KAFKA-16798 > URL: https://issues.apache.org/jira/browse/KAFKA-16798 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0 >Reporter: Thanos Athanasopoulos >Priority: Major > > Single instance MirrorMaker2 in dedicated mode, active passive replication > logic. > sync.group.offsets.interval.seconds=2 configuration is enabled and active > {noformat} > [root@x-x ~]# docker logs cc-mm 2>&1 -f | grep -i > "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds > " > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > {noformat} > but is not working, the commit of offsets happens *always every 60 seconds* > as you can see in the logs > {noformat} > [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] > WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) >
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1609062958 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -105,12 +106,19 @@ private KafkaBasedLog createBackingStore(MirrorCheckpointConfig /** * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage. */ -public void start() { -backingStore.start(); +public void start(boolean initializationMustReadToEnd) { +this.initializationMustReadToEnd = initializationMustReadToEnd; +log.debug("OffsetSyncStore starting - must read to OffsetSync end = ", initializationMustReadToEnd); Review Comment: noticed one typo here, the log message actually doesn't print the boolean -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
junrao commented on PR #15673: URL: https://github.com/apache/kafka/pull/15673#issuecomment-2123590782 @clolov : 3.8.0 feature freeze is about 1 week away. It would be useful to get this PR in before the 3.8 branch is cut. Any updates on this PR? What's an example of the failed test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]
junrao commented on code in PR #15993: URL: https://github.com/apache/kafka/pull/15993#discussion_r1609025456 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -2095,7 +2101,8 @@ object UnifiedLog extends Logging { } /** - * If the recordVersion is >= RecordVersion.V2, then create and return a LeaderEpochFileCache. + * If the recordVersion is >= RecordVersion.V2, then create a new LeaderEpochFileCache instance + * or update current cache if any with the new checkpoint and return it. Review Comment: How about changing it to the following? "If the recordVersion is >= RecordVersion.V2, create a new LeaderEpochFileCache instance. Loading the epoch entries from the backing checkpoint file or the provided currentCache if not empty." ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -42,10 +43,15 @@ * * Leader Epoch = epoch assigned to each leader by the controller. * Offset = offset of the first message in each epoch. + * + * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flushes the epoch-entry changes to checkpoint asynchronously. Review Comment: flushes => flush ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -390,7 +424,27 @@ public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint leaderEpochCheckpoint) lock.readLock().lock(); try { leaderEpochCheckpoint.write(epochEntries()); -return new LeaderEpochFileCache(topicPartition, leaderEpochCheckpoint); +// We instantiate LeaderEpochFileCache after writing leaderEpochCheckpoint, +// hence it is guaranteed that the new cache is consistent with the latest epoch entries. +return new LeaderEpochFileCache(topicPartition, leaderEpochCheckpoint, scheduler); +} finally { +lock.readLock().unlock(); +} +} + +/** + * Returns a new LeaderEpochFileCache which contains same + * epoch entries with replacing backing checkpoint Review Comment: checkpoint => checkpoint file ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -313,14 +345,14 @@ public void truncateFromEnd(long endOffset) { if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); -// We intentionally don't force flushing change to the device here because: +// We flush the change to the device in the background because: Review Comment: It would be useful to explain in the comment that the reason async flush works is because the stale epochs always have more entries and no missing entries. ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -55,16 +61,40 @@ public class LeaderEpochFileCache { /** * @param topicPartition the associated topic partition * @param checkpoint the checkpoint file + * @param scheduler the scheduler to use for async I/O operations */ @SuppressWarnings("this-escape") -public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint, Scheduler scheduler) { this.checkpoint = checkpoint; this.topicPartition = topicPartition; +this.scheduler = scheduler; LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); log = logContext.logger(LeaderEpochFileCache.class); checkpoint.read().forEach(this::assign); } +/** + * Instantiates a new LeaderEpochFileCache with replacing checkpoint with given one + * without restoring the cache from the checkpoint, with retaining the current epoch entries. Review Comment: How about following? "Instantiate a new LeaderEpochFileCache with provided epoch entries instead of from the backing checkpoint file. The provided epoch entries are expected to no less fresher than the checkpoint file." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and controllers [kafka]
cmccabe commented on PR #16006: URL: https://github.com/apache/kafka/pull/16006#issuecomment-2123537218 Tests should be fixed now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2123528226 @gharris1727 I gave up and used the ugly try. That warning is not occurring in every test... But I went all the way in `OffsetSyncStoreTest` as I prefer consistency to beauty. Removed a couple of warnings in MirrorCheckpointTaskTest too in the second commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16626: Lazily convert subscribed topic names to topic ids [kafka]
jeffkbkim commented on PR #15970: URL: https://github.com/apache/kafka/pull/15970#issuecomment-2123510093 @dajac thanks for the review. I have addressed your comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608986312 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) -emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) - else { + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) { +// We need to be careful before reading the segment as `maxOffsetMetadata` may not be a complete metadata: +// 1. If maxOffsetMetadata is message-offset-only, then return empty fetchDataInfo since +// maxOffsetMetadata.offset is not on local log segments. +// 2. If maxOffsetMetadata.segmentBaseOffset is smaller than segment.baseOffset, then return empty fetchDataInfo. Review Comment: Looking at the code again. @showuon is correct. `convertToOffsetMetadataOrThrow(startOffset)` uses nextOffset, which always has the offset metadata. So, we can keep the `convertToOffsetMetadataOrThrow(startOffset)` call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608984545 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) { Review Comment: Returning the current offset is correct. I am wondering why the suggested approach returns nextOffset. `segment.read(startOffset, maxLength, maxPosition, minOneMessage)` should return a non-null fetchDataInfo since the startOffset exists in the first segment, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16779) Kafka retains logs past specified retention
[ https://issues.apache.org/jira/browse/KAFKA-16779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847997#comment-17847997 ] Nicholas Feinberg edited comment on KAFKA-16779 at 5/21/24 9:41 PM: When we explicitly set topics' retention to 4d (34560ms), our brokers immediately expired the surprisingly old logs. When we removed the setting, they began accumulating old logs again. I've confirmed that the same setting is present in the brokers' `server.properties` file - that is, they have `log.retention.hours=96`. I've also checked and confirmed that topics do not have an explicitly set retention that would override this. was (Author: nfeinberg): When we explicitly set topics' retention to 4d (34560ms), our brokers immediately expired the surprisingly old logs. I've confirmed that the same setting is present in the brokers' `server.properties` file - that is, they have `log.retention.hours=96`. I've also checked and confirmed that topics do not have an explicitly set retention that would override this. > Kafka retains logs past specified retention > --- > > Key: KAFKA-16779 > URL: https://issues.apache.org/jira/browse/KAFKA-16779 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Nicholas Feinberg >Priority: Major > Labels: expiration, retention > Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, > kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, > state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz > > > In a Kafka cluster with all topics set to four days of retention or longer > (34560ms), most brokers seem to be retaining six days of data. > This is true even for topics which have high throughput (500MB/s, 50k msgs/s) > and thus are regularly rolling new log segments. We observe this unexpectedly > high retention both via disk usage statistics and by requesting the oldest > available messages from Kafka. > Some of these brokers crashed with an 'mmap failed' error (attached). When > those brokers started up again, they returned to the expected four days of > retention. > Manually restarting brokers also seems to cause them to return to four days > of retention. Demoting and promoting brokers only has this effect on a small > part of the data hosted on a broker. > These hosts had ~170GiB of free memory available. We saw no signs of pressure > on either system or JVM heap memory before or after they reported this error. > Committed memory seems to be around 10%, so this doesn't seem to be an > overcommit issue. > This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). > Prior to the upgrade, it was running on Kafka 2.4. > We last reduced retention for ops on May 7th, after which we restored > retention to our default of four days. This was the second time we've > temporarily reduced and restored retention since the upgrade. This problem > did not manifest the previous time we did so, nor did it manifest on our > other Kafka 3.7 clusters. > We are running on AWS > [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We > have 23 brokers, each with 24 disks. We're running in a JBOD configuration > (i.e. unraided). > Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, > we're still using Zookeeper. > Sample broker logs are attached. The 05-12 and 05-14 logs are from separate > hosts. Please let me know if I can provide any further information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608965614 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -82,77 +86,131 @@ public void testOffsetTranslation() { @Test public void testNoTranslationIfStoreNotStarted() { -try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { -// no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { Review Comment: > if you approve I'd also backport the fix to 3.7 I'm on the fence about that, leaning towards yes. I regret backporting KAFKA-12468 so far and introducing this issue, and I didn't communicate it properly to users. I think you can backport this once you have a full release note written that can be backported at the same time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608959573 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -82,77 +86,131 @@ public void testOffsetTranslation() { @Test public void testNoTranslationIfStoreNotStarted() { -try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { -// no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { Review Comment: I have somewhat strong feelings, I wouldn't call them very strong. If someone noticed this IDE warning and created a ticket and a PR to fix it, I would review that. Am I going to make the build enforce this warning? No, but I have seen other situations where the warning did point out real resource leaks... I just wanted to save the effort required to go and rework this later, and prevent this PR from introducing an easily avoidable warning. I agree with you about suppressing warnings, I don't think that is a healthy practice to have. I just tried making this a try-with-resources and the indenting turned out fine. The body of backingStoreStart is at the exact same indentation as it is currently. ``` try (FakeOffsetSyncStore store = new FakeOffsetSyncStore() { @Override void backingStoreStart() { // read a sync during startup sync(tp, 100, 200); assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 0)); assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 100)); assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 200)); } }) { // no offsets exist and store is not started assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // After the store is started all offsets are visible store.start(true); assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0)); assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100)); assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200)); } ``` Here's the Consumer alternative I thought about, which uses one less indentation level at the cost of a variable, a field, and two constructors: ``` Consumer init = store -> { // read a sync during startup store.sync(tp, 100, 200); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); }; try (FakeOffsetSyncStore store = new FakeOffsetSyncStore(init)) { // no offsets exist and store is not started assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // After the store is started all offsets are visible store.start(true); assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0)); assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100)); assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200)); } ``` Either of these is preferable to having the warning or suppressing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]
ableegoldman commented on code in PR #16024: URL: https://github.com/apache/kafka/pull/16024#discussion_r1608950503 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TopicPartitionAssignmentInfo.java: ## @@ -0,0 +1,45 @@ +package org.apache.kafka.streams.processor.assignment; + +import java.util.Optional; +import java.util.Set; + +/** + * This is a simple container class used during the assignment process to distinguish + * TopicPartitions type. Since the assignment logic can depend on the type of topic we're + * looking at, and the rack information of the partition, this container class should have + * everything necessary to make informed task assignment decisions. + */ +public interface TopicPartitionAssignmentInfo { Review Comment: I mean I do agree on keeping it scoped to the assignment, and putting it under the assignment package should be effective enough. I was more thinking along the lines of marking a clear distinction between interfaces/metadata that falls under the `ApplicationState`/input status vs things that fall under the `TaskAssignment`/output category. Unfortunately "assignment" is just an overloaded term, but since it's correlated with assignor output in other class names, I think it's best not to use it in class names to also mean "related to the assignor" since putting it under the assignment package is another way to signal that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16809: Run Javadoc in CI [kafka]
gharris1727 opened a new pull request, #16025: URL: https://github.com/apache/kafka/pull/16025 Until now, javadoc warnings can be added in PRs, and are only noticed when someone else runs `./gradlew javadoc`. Now introducing problems in javadocs will immediately fail the PR CI build, notifying the contributor/committer when a PR has a javadoc regression. This only happens in JDK15+ which adds the `-Werror` parameter. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16739: Exclude protected members from aggregated release Javadocs [kafka]
gharris1727 merged PR #15940: URL: https://github.com/apache/kafka/pull/15940 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16739: Exclude protected members from aggregated release Javadocs [kafka]
gharris1727 commented on PR #15940: URL: https://github.com/apache/kafka/pull/15940#issuecomment-2123422940 Build passes, CI test failures appear unrelated, and this passes for me locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1608934073 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() { } @Test -void testEnsureEventsAreCompleted() { Review Comment: Yes, the test was a little suspect in terms of its value-add, so I'd removed it. I was planning to file a Jira to move several of the tests (including this one) from `ConsumerNetworkThreadTest` to `ApplicationEventProcessorTest`. Then we could fix up some of the funkiness in this test as a separate task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16558) Implement HeartbeatRequestState.toStringBase()
[ https://issues.apache.org/jira/browse/KAFKA-16558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brenden DeLuna reassigned KAFKA-16558: -- Assignee: Brenden DeLuna (was: Kirk True) > Implement HeartbeatRequestState.toStringBase() > -- > > Key: KAFKA-16558 > URL: https://issues.apache.org/jira/browse/KAFKA-16558 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Brenden DeLuna >Priority: Minor > Labels: consumer-threading-refactor, logging > Fix For: 3.8.0 > > > The inner class {{HeartbeatRequestState}} does not override the > {{toStringBase()}} method. This affects debugging and troubleshooting > consumer issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16557) Fix OffsetFetchRequestState.toString()
[ https://issues.apache.org/jira/browse/KAFKA-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brenden DeLuna reassigned KAFKA-16557: -- Assignee: Brenden DeLuna (was: Kirk True) > Fix OffsetFetchRequestState.toString() > -- > > Key: KAFKA-16557 > URL: https://issues.apache.org/jira/browse/KAFKA-16557 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Brenden DeLuna >Priority: Minor > Labels: consumer-threading-refactor, logging > Fix For: 3.8.0 > > > The code incorrectly overrides the {{toString()}} method instead of > overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting > consumer issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16000) Migrate MembershipManagerImplTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-16000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brenden DeLuna reassigned KAFKA-16000: -- Assignee: Brenden DeLuna (was: Kirk True) > Migrate MembershipManagerImplTest away from ConsumerTestBuilder > --- > > Key: KAFKA-16000 > URL: https://issues.apache.org/jira/browse/KAFKA-16000 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lucas Brutschy >Assignee: Brenden DeLuna >Priority: Minor > Labels: consumer-threading-refactor, unit-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15999) Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-15999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brenden DeLuna reassigned KAFKA-15999: -- Assignee: Brenden DeLuna (was: Kirk True) > Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder > - > > Key: KAFKA-15999 > URL: https://issues.apache.org/jira/browse/KAFKA-15999 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lucas Brutschy >Assignee: Brenden DeLuna >Priority: Minor > Labels: consumer-threading-refactor, unit-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brenden DeLuna reassigned KAFKA-16001: -- Assignee: Brenden DeLuna > Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder > --- > > Key: KAFKA-16001 > URL: https://issues.apache.org/jira/browse/KAFKA-16001 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lucas Brutschy >Assignee: Brenden DeLuna >Priority: Minor > Labels: consumer-threading-refactor, unit-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16001: --- Description: We should: # Remove spy calls to the dependencies # Remove ConsumerNetworkThreadTest > Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder > --- > > Key: KAFKA-16001 > URL: https://issues.apache.org/jira/browse/KAFKA-16001 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lucas Brutschy >Assignee: Brenden DeLuna >Priority: Minor > Labels: consumer-threading-refactor, unit-tests > Fix For: 3.8.0 > > > We should: > # Remove spy calls to the dependencies > # Remove ConsumerNetworkThreadTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]
apourchet commented on code in PR #16024: URL: https://github.com/apache/kafka/pull/16024#discussion_r1608924234 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TopicPartitionAssignmentInfo.java: ## @@ -0,0 +1,45 @@ +package org.apache.kafka.streams.processor.assignment; + +import java.util.Optional; +import java.util.Set; + +/** + * This is a simple container class used during the assignment process to distinguish + * TopicPartitions type. Since the assignment logic can depend on the type of topic we're + * looking at, and the rack information of the partition, this container class should have + * everything necessary to make informed task assignment decisions. + */ +public interface TopicPartitionAssignmentInfo { Review Comment: To elaborate on what I said I believe that generally speaking interfaces that are too broad when created get super bloated over time, so I usually err on the side of small and precise and expand if necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and controllers [kafka]
cmccabe commented on code in PR #16006: URL: https://github.com/apache/kafka/pull/16006#discussion_r1608921813 ## core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala: ## @@ -350,11 +347,7 @@ class ZkMetadataCache( override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = { val snapshot = metadataSnapshot -brokerId match { - case id if snapshot.controllerId.filter(_.isInstanceOf[KRaftCachedControllerId]).exists(_.id == id) => -kraftControllerNodeMap.get(id) - case _ => snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName)) -} +snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName)) Review Comment: I recall why we did this now (I didn't originally)... During ZK migration, we wanted a way to make the controller node provider work with both ZK and KRaft controllers. The correct way to do this is just to consult `RaftManager` for the endpoint when we're dealing with a KRaft controller (which we do know) But at the time, we "hacked" it by adding the controllers to the cache as "brokers" for some functions but not others (ew) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]
apourchet commented on code in PR #16024: URL: https://github.com/apache/kafka/pull/16024#discussion_r1608918902 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TopicPartitionAssignmentInfo.java: ## @@ -0,0 +1,45 @@ +package org.apache.kafka.streams.processor.assignment; + +import java.util.Optional; +import java.util.Set; + +/** + * This is a simple container class used during the assignment process to distinguish + * TopicPartitions type. Since the assignment logic can depend on the type of topic we're + * looking at, and the rack information of the partition, this container class should have + * everything necessary to make informed task assignment decisions. + */ +public interface TopicPartitionAssignmentInfo { Review Comment: It makes sense, I thought that since it's metadata that will uniquely be used for task assignment I was OK having `Assignment` in the name, but I'll change to `TaskTopicPartition` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]
cmccabe commented on PR #16008: URL: https://github.com/apache/kafka/pull/16008#issuecomment-2123394515 > Are there any tests that we should add or do the existing test cover this functionality? I think there are some that cover it already, but I added a test of controller failover in `KRaftClusterTest`, just to be sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]
ableegoldman commented on code in PR #16024: URL: https://github.com/apache/kafka/pull/16024#discussion_r1608917448 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TopicPartitionAssignmentInfo.java: ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.Optional; +import java.util.Set; + +/** + * This is a simple container class used during the assignment process to distinguish + * TopicPartitions type. Since the assignment logic can depend on the type of topic we're + * looking at, and the rack information of the partition, this container class should have + * everything necessary to make informed task assignment decisions. + */ +public interface TopicPartitionAssignmentInfo { +/** + * + * @return the string name of the topic. + */ +String topic(); + +/** + * + * @return the partition id of this topic partition. + */ +int partition(); Review Comment: nit: I'd return the TopicPartition directly rather than having APIs for the partition and topic separately -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]
ableegoldman commented on code in PR #16024: URL: https://github.com/apache/kafka/pull/16024#discussion_r1608911390 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TopicPartitionAssignmentInfo.java: ## @@ -0,0 +1,45 @@ +package org.apache.kafka.streams.processor.assignment; + +import java.util.Optional; +import java.util.Set; + +/** + * This is a simple container class used during the assignment process to distinguish + * TopicPartitions type. Since the assignment logic can depend on the type of topic we're + * looking at, and the rack information of the partition, this container class should have + * everything necessary to make informed task assignment decisions. + */ +public interface TopicPartitionAssignmentInfo { Review Comment: the only thing I take issue with is the name here -- technically this is just application metadata about how this topic partition fits into the topology, ie it's not really "assignment" info -- we're just using it for assignment purposes ...I'm not sure I'm explaining that well and it sounds overly pedantic, but if I didn't bring it up now then someone probably would say the same thing on the KIP discussion once we send out this update. I know `TopicPartitionInfo` is already taken, and is too broad anyways. I'd recommend something like `TaskTopicPartition` which I think helps drive home the point that this is metadata related to how this topic partition relates to the given task, not info about the topic partition itself per say Does that make any sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() { } @Test -void testEnsureEventsAreCompleted() { Review Comment: Actually seems to me that we shouldn't have this test here (and maybe this is why @kirktrue removed it before?). As I see it, this unit test is testing something that is not the `ConsumerNetworkThread`'s responsibility (and that's why it ends up being complicated, having to mimic the reaper behaviour and spying). It is testing that events are completed, and that's the reaper.reap responsibility, so seems to me we need to: 1. test that the `ConsumerNetworkThread` calls the reaper with the full list of events -> done already in the [testCleanupInvokesReaper](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java#L331) 2. test that the `CompletableEventReaper.reap(Collection events)` completes the events -> done in CompletableEventReaperTest ([testIncompleteQueue](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L135) and [testIncompleteTracked](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L170)) In the end, as it is, we end up asserting a behaviour we're mocking ourselves in the `doAnswer`, so not much value I would say? Agree with @cadonna that we need coverage, but I would say that we have it, on my points 1 and 2, and this should be removed. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() { } @Test -void testEnsureEventsAreCompleted() { Review Comment: Actually seems to me that we shouldn't have this test here (and maybe this is why @kirktrue removed it before?). As I see it, this unit test is testing something that is not the `ConsumerNetworkThread`'s responsibility (and that's why it ends up being complicated, having to mimic the reaper behaviour and spying). It is testing that events are completed, and that's the reaper.reap responsibility, so seems to me we need to: 1. test that the `ConsumerNetworkThread` calls the reaper with the full list of events -> done already in the [testCleanupInvokesReaper](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java#L331) 2. test that the `CompletableEventReaper.reap(Collection events)` completes the events -> done in CompletableEventReaperTest ([testIncompleteQueue](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L135) and [testIncompleteTracked](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L170)) In the end, as it is, we end up asserting a behaviour we're mocking ourselves in the `doAnswer`, so not much value I would say? Agree with @cadonna that we need coverage, but I answer that we have it, on my points 1 and 2, and this should be removed. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1608908254 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4209,31 +4241,67 @@ private void removePendingSyncMember( * @param contextThe request context. * @param requestThe actual Heartbeat request. * - * @return The Heartbeat response. + * @return The coordinator result that contains the heartbeat response. */ -public HeartbeatResponseData classicGroupHeartbeat( +public CoordinatorResult classicGroupHeartbeat( Review Comment: I guess because now we'll also read from the ConsumerGroup related things which are timeline data structures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608907673 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -82,77 +86,131 @@ public void testOffsetTranslation() { @Test public void testNoTranslationIfStoreNotStarted() { -try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { -// no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { Review Comment: on another note, if you approve I'd also backport the fix to 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]
cmccabe commented on code in PR #16008: URL: https://github.com/apache/kafka/pull/16008#discussion_r1608907374 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -313,4 +314,8 @@ class KafkaRaftManager[T]( override def leaderAndEpoch: LeaderAndEpoch = { client.leaderAndEpoch } + + override def idToNode(id: Int, listener: String): Option[Node] = { +client.idToNode(id, listener).toScala + } Review Comment: ack ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() { } } +public Optional idToNode(int id, String listener) { Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1608907132 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat( } } +/** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ +private CoordinatorResult classicGroupHeartbeatToConsumerGroup( +ConsumerGroup group, +RequestContext context, +HeartbeatRequestData request +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); +ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + +scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + +Errors error = Errors.NONE; +if (member.memberEpoch() < group.groupEpoch() || +member.state() == MemberState.UNREVOKED_PARTITIONS || +(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.hasUnreleasedPartitions(member))) { +error = Errors.REBALANCE_IN_PROGRESS; +scheduleConsumerGroupJoinTimeout(groupId, memberId, member.rebalanceTimeoutMs()); Review Comment: > we cancel the join timeout when we first convert to consumer group We don't cancel the timeout in case the conversion fails and the state needs to be reverted. The classic group join timeout does nothing if the group is a consumer group. > when we have a group epoch bump we tell the classic group member we're rebalancing and they should send a join request Yes correct, and the timeout here is for the member instead of the whole group. For each member, the rebalance will be something like - heartbeat -- if there's an ongoing rebalance, schedule the join timeout - join -- cancel the join timeout; schedule the sync timeout - sync -- cancel the sync timeout; maybe schedule a join timeout if a new rebalance ongoing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608896699 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -82,77 +86,131 @@ public void testOffsetTranslation() { @Test public void testNoTranslationIfStoreNotStarted() { -try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { -// no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { Review Comment: Hi @gharris1727 I use IntelliJ too and saw the warning. I could have used a `@suppress` annotation but I am very reluctant to make code less readable because of limited insight by linters. Similarly to make the fake store more complex. Using try-with-resource with a local class results in horrible indentation as you said. I don't share a strong worry of future leaks in testing - seems speculative to me. In this instance unless you have very strong feelings, I'd really leave the test as-is -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]
cmccabe commented on code in PR #16008: URL: https://github.com/apache/kafka/pull/16008#discussion_r1608906143 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -366,4 +368,16 @@ public static VoterSet fromInetSocketAddresses(String listener, Map idToNode(int id, String listener) { +VoterNode voterNode = voters.get(id); +if (voterNode == null) { +return Optional.empty(); +} +InetSocketAddress address = voterNode.listeners.get(listener); +if (address == null) { +return Optional.empty(); +} +return Optional.of(new Node(id, address.getHostString(), address.getPort())); +} Review Comment: ok ## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ## @@ -548,6 +550,10 @@ public VotedState votedStateOrThrow() { .orElseThrow(() -> new IllegalStateException("Expected to be Voted, but current state is " + state)); } +public Optional idToNode(int id, String listener) { +return latestVoterSet.get().idToNode(id, listener); +} Review Comment: removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]
cmccabe commented on code in PR #16008: URL: https://github.com/apache/kafka/pull/16008#discussion_r1608904666 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() { } } +public Optional idToNode(int id, String listener) { +return quorum.idToNode(id, listener); Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and controllers [kafka]
jsancio commented on code in PR #16006: URL: https://github.com/apache/kafka/pull/16006#discussion_r1608902290 ## core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala: ## @@ -350,11 +347,7 @@ class ZkMetadataCache( override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = { val snapshot = metadataSnapshot -brokerId match { - case id if snapshot.controllerId.filter(_.isInstanceOf[KRaftCachedControllerId]).exists(_.id == id) => -kraftControllerNodeMap.get(id) - case _ => snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName)) -} +snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName)) Review Comment: Interesting. Do you know why this was added in the first place? @akhileshchg -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 6) Post process new assignment structure [kafka]
ableegoldman merged PR #16002: URL: https://github.com/apache/kafka/pull/16002 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 6) Post process new assignment structure [kafka]
ableegoldman commented on PR #16002: URL: https://github.com/apache/kafka/pull/16002#issuecomment-2123367972 Test failures are unrelated, merging to trunk Worth noting that the `org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest` did fail, which may seem related, but cannot be since this PR only adds new methods. Furthermore, I have seen this test fail occasionally in recent weeks, and while this is probably a real bug/failure (see [comment](https://issues.apache.org/jira/browse/KAFKA-16586?focusedCommentId=17848363=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17848363)), it seems to have been flaky since before we started work on KIP-924 so it must be unrelated. Just mentioning this in case we see this fail on future KIP-924 PRs that do more than just add new methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]
jsancio commented on code in PR #16008: URL: https://github.com/apache/kafka/pull/16008#discussion_r1608891761 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() { } } +public Optional idToNode(int id, String listener) { +return quorum.idToNode(id, listener); Review Comment: Let's use `partitionState` directly and remove the changes to `QuorumState`. ```java return partitionState.latestVoterSet().idToNode(id, listener); ``` ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() { } } +public Optional idToNode(int id, String listener) { Review Comment: Similar to my other comments but how about `voterNode` for the name of the method? ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -313,4 +314,8 @@ class KafkaRaftManager[T]( override def leaderAndEpoch: LeaderAndEpoch = { client.leaderAndEpoch } + + override def idToNode(id: Int, listener: String): Option[Node] = { +client.idToNode(id, listener).toScala + } Review Comment: I prefer if we call this something like `voterNode` as it makes it clear that this only resolves to a `Option[Node]` if the id is a voter/controller. ## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ## @@ -548,6 +550,10 @@ public VotedState votedStateOrThrow() { .orElseThrow(() -> new IllegalStateException("Expected to be Voted, but current state is " + state)); } +public Optional idToNode(int id, String listener) { +return latestVoterSet.get().idToNode(id, listener); +} Review Comment: See my other comment but I think we can remove this. ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -366,4 +368,16 @@ public static VoterSet fromInetSocketAddresses(String listener, Map idToNode(int id, String listener) { +VoterNode voterNode = voters.get(id); +if (voterNode == null) { +return Optional.empty(); +} +InetSocketAddress address = voterNode.listeners.get(listener); +if (address == null) { +return Optional.empty(); +} +return Optional.of(new Node(id, address.getHostString(), address.getPort())); +} Review Comment: Okay. I added a similar method in this PR: https://github.com/apache/kafka/pull/15986/files#diff-7164c449a4cc53dd28cc1a7201fa8b7a824749dab013fb33dff101a1002565daR86-R101 If you agree, do you mind changing the name to `voterNode` to minimize the conflicts. Don't worry about using `ListenerName` instead of `String` as my PR deals with that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() { } @Test -void testEnsureEventsAreCompleted() { Review Comment: Actually seems to me that we shouldn't have this test here (and maybe this is why @kirktrue removed it before?). As I see it, this unit test is testing something that is not the `ConsumerNetworkThread`'s responsibility (and that's why it ends up being complicated, having to mimic the reaper behaviour and spying). It is testing that events are completed, and that's the reaper.reap responsibility, so seems to me we need to: 1. test that the `ConsumerNetworkThread` calls the reaper with the full list of events -> done already in the [testCleanupInvokesReaper](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java#L331) 2. test that the `CompletableEventReaper.reap(Collection events)` completes the events, and that's done in CompletableEventReaperTest ([testIncompleteQueue](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L135) and [testIncompleteTracked](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L170)) In the end, as it is, we end up asserting a behaviour we're mocking ourselves in the `doAnswer`, so not much value I would say? Agree with @cadonna that we need coverage, but I answer that we have it, on my points 1 and 2, and this should be removed. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() { } @Test -void testEnsureEventsAreCompleted() { Review Comment: Actually seems to me that we shouldn't have this test here (and maybe this is why @kirktrue removed it before?). As I see it, this unit test is testing something that is not the `ConsumerNetworkThread`'s responsibility (and that's why it ends up being complicated, having to mimic the reaper behaviour and spying). It is testing that events are completed, and that's the reaper.reap responsibility, so seems to me we need to: 1. test that the `ConsumerNetworkThread` calls the reaper with the full list of events -> done already in the [testCleanupInvokesReaper](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java#L331) 2. test that the `CompletableEventReaper.reap(Collection events)` completes the events, and that's done in CompletableEventReaperTest ([testIncompleteQueue](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L135) and [testIncompleteTracked](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L170)) In the end, as it is, we end up asserting a behaviour we're mocking ourselves in the `doAnswer`, so not much value I would say? Agree with @cadonna that we need coverage, but I would say that's what we have on my points 1 and 2, and this should be removed. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() { } @Test -void testEnsureEventsAreCompleted() { Review Comment: Actually seems to me that we shouldn't have this test here (and maybe this is why @kirktrue removed it before?). As I see it, this unit test is testing something that is not the `ConsumerNetworkThread`'s responsibility (and that's why it ends up being complicate, having to mimic the reaper behaviour and spying). It is testing that events are completed, and that's the reaper.reap responsibility, so seems to me we need to: 1. test that the `ConsumerNetworkThread` calls the reaper with the full list of events -> done already in the [testCleanupInvokesReaper](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java#L331) 2. test that the `CompletableEventReaper.reap(Collection events)` completes the events, and that's done in CompletableEventReaperTest ([testIncompleteQueue](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L135) and [testIncompleteTracked](https://github.com/apache/kafka/blob/91af164415b8b950c70d3a61bb0837c34ae4ed69/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java#L170)) In the end, as it is, we end up asserting a behaviour we're mocking ourselves in the `doAnswer`, so not much value I would say? Agree with @cadonna that we need coverage, but I would say that's what we have on my points 1 and 2, and this should be removed. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608896699 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -82,77 +86,131 @@ public void testOffsetTranslation() { @Test public void testNoTranslationIfStoreNotStarted() { -try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { -// no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { Review Comment: Hi @gharris1727 I use IntelliJ too and see the warning. I am reluctant to -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16809) Run javadoc build in CI
Greg Harris created KAFKA-16809: --- Summary: Run javadoc build in CI Key: KAFKA-16809 URL: https://issues.apache.org/jira/browse/KAFKA-16809 Project: Kafka Issue Type: Task Components: build, docs Reporter: Greg Harris Assignee: Greg Harris The `javadoc` target isn't run during CI builds, allowing for errors in javadocs to leak in. Instead, we can include javadoc like checkstyle, spotbugs, and import control as a pre-test step, to ensure that PRs aren't causing javadoc build regressions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1608875582 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -329,6 +334,17 @@ public void testCommitAsyncWithFencedException() { commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception()); assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> consumer.commitAsync()); + +// Close the consumer here as we know it will cause a FencedInstanceIdException to be thrown. +// If we get an error other than the FencedInstanceIdException, we'll raise a ruckus. +try { +consumer.close(); +} catch (KafkaException e) { +assertNotNull(e.getCause()); +assertInstanceOf(FencedInstanceIdException.class, e.getCause()); +} finally { +consumer = null; +} Review Comment: Yes, it turns out that changes made elsewhere have obviated the need for this check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1608870767 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -329,6 +334,17 @@ public void testCommitAsyncWithFencedException() { commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception()); assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> consumer.commitAsync()); + +// Close the consumer here as we know it will cause a FencedInstanceIdException to be thrown. +// If we get an error other than the FencedInstanceIdException, we'll raise a ruckus. +try { +consumer.close(); +} catch (KafkaException e) { +assertNotNull(e.getCause()); +assertInstanceOf(FencedInstanceIdException.class, e.getCause()); +} finally { +consumer = null; +} Review Comment: how did we resolve this? I see the section got completely removed, verification not needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16713: Define initial set of RPCs for KIP-932 [kafka]
AndrewJSchofield commented on PR #16022: URL: https://github.com/apache/kafka/pull/16022#issuecomment-2123317868 Yes, we will wait until we cut the 3.8 branch before merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing
[ https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848363#comment-17848363 ] A. Sophie Blee-Goldman commented on KAFKA-16586: By the way – if this test is failing, it's highly likely that this is a real bug. This test was designed to be completely deterministic (hence the seed) and doesn't set up any actual clients or clusters or anything that might make it flaky. [~mjsax] if you see this again can you report the failure here (including both the seed and rackAwareStrategy parameter)? > Test TaskAssignorConvergenceTest failing > > > Key: KAFKA-16586 > URL: https://issues.apache.org/jira/browse/KAFKA-16586 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Major > > {code:java} > java.lang.AssertionError: Assertion failed in randomized test. Reproduce > with: `runRandomizedScenario(-538095696758490522)`. at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545) > at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code} > This might expose an actual bug (or incorrect test setup) and should be > reproducible (die not try it myself yet). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing
[ https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848358#comment-17848358 ] A. Sophie Blee-Goldman commented on KAFKA-16586: Note: when reporting an instance of failure for this test, please include both the seed (contained in the stack trace, eg "runRandomizedScenario(-3595932977400264775)") and the full test name including parameters (not present in the original ticket description, eg "rackAwareStrategy=balance_subtopology") We need the test name to figure out which variant of the assignor was running. This will help narrow down the issue to a specific rackAwareStrategy, as well as enable reproduction of the failed test > Test TaskAssignorConvergenceTest failing > > > Key: KAFKA-16586 > URL: https://issues.apache.org/jira/browse/KAFKA-16586 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Major > > {code:java} > java.lang.AssertionError: Assertion failed in randomized test. Reproduce > with: `runRandomizedScenario(-538095696758490522)`. at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545) > at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code} > This might expose an actual bug (or incorrect test setup) and should be > reproducible (die not try it myself yet). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]
cmccabe commented on PR #16008: URL: https://github.com/apache/kafka/pull/16008#issuecomment-2123296527 I have re-run all the test failures locally, and they all passed (they were flakes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16343: Add unit tests of foreignKeyJoin classes [kafka]
wcarlson5 merged PR #15564: URL: https://github.com/apache/kafka/pull/15564 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]
rreddy-22 commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1608849959 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) { return targetAssignment.getOrDefault(memberId, Assignment.EMPTY); } +/** + * @return An immutable map containing all the topic partitions + * with their current member assignments. + */ +public Map> partitionAssignments() { +return Collections.unmodifiableMap(partitionAssignments); +} + /** * Updates target assignment of a member. * * @param memberId The member id. * @param newTargetAssignment The new target assignment. */ public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) { +updatePartitionAssignments( +memberId, +targetAssignment.getOrDefault(memberId, new Assignment(Collections.emptyMap())), +newTargetAssignment +); targetAssignment.put(memberId, newTargetAssignment); } +/** + * Updates partition assignments of the topics. + * + * @param memberId The member Id. + * @param oldTargetAssignment The old target assignment. + * @param newTargetAssignment The new target assignment. + * + * Package private for testing. + */ +void updatePartitionAssignments( Review Comment: used it in tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16343: Add unit tests of foreignKeyJoin classes [kafka]
wcarlson5 commented on PR #15564: URL: https://github.com/apache/kafka/pull/15564#issuecomment-2123293255 The failing tests are not related. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]
rreddy-22 commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1608849959 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) { return targetAssignment.getOrDefault(memberId, Assignment.EMPTY); } +/** + * @return An immutable map containing all the topic partitions + * with their current member assignments. + */ +public Map> partitionAssignments() { +return Collections.unmodifiableMap(partitionAssignments); +} + /** * Updates target assignment of a member. * * @param memberId The member id. * @param newTargetAssignment The new target assignment. */ public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) { +updatePartitionAssignments( +memberId, +targetAssignment.getOrDefault(memberId, new Assignment(Collections.emptyMap())), +newTargetAssignment +); targetAssignment.put(memberId, newTargetAssignment); } +/** + * Updates partition assignments of the topics. + * + * @param memberId The member Id. + * @param oldTargetAssignment The old target assignment. + * @param newTargetAssignment The new target assignment. + * + * Package private for testing. + */ +void updatePartitionAssignments( Review Comment: used it in tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing
[ https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848351#comment-17848351 ] A. Sophie Blee-Goldman commented on KAFKA-16586: Failed again: {code:java} randomClusterPerturbationsShouldConverge[rackAwareStrategy=balance_subtopology] – org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest Stacktracejava.lang.AssertionError: Assertion failed in randomized test. Reproduce with: `runRandomizedScenario(-3595932977400264775)`. {code} > Test TaskAssignorConvergenceTest failing > > > Key: KAFKA-16586 > URL: https://issues.apache.org/jira/browse/KAFKA-16586 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Major > > {code:java} > java.lang.AssertionError: Assertion failed in randomized test. Reproduce > with: `runRandomizedScenario(-538095696758490522)`. at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545) > at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code} > This might expose an actual bug (or incorrect test setup) and should be > reproducible (die not try it myself yet). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14517) Implement regex subscriptions
[ https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848348#comment-17848348 ] Lianet Magrans commented on KAFKA-14517: Sure, I'll assign it to me then. Thanks [~phuctran] ! > Implement regex subscriptions > - > > Key: KAFKA-14517 > URL: https://issues.apache.org/jira/browse/KAFKA-14517 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-preview > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14517) Implement regex subscriptions
[ https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-14517: -- Assignee: Lianet Magrans (was: Phuc Hong Tran) > Implement regex subscriptions > - > > Key: KAFKA-14517 > URL: https://issues.apache.org/jira/browse/KAFKA-14517 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-preview > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608834195 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) -emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) - else { + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) { +// We need to be careful before reading the segment as `maxOffsetMetadata` may not be a complete metadata: +// 1. If maxOffsetMetadata is message-offset-only, then return empty fetchDataInfo since +// maxOffsetMetadata.offset is not on local log segments. +// 2. If maxOffsetMetadata.segmentBaseOffset is smaller than segment.baseOffset, then return empty fetchDataInfo. Review Comment: This comment is pending. Will wait for @showuon review before applying the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608831211 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) { Review Comment: When applying this change, the newly added LocalLog#testWhenFetchOffsetHigherThanMaxOffset test fails for the case-3 and case-4. (ie) Which offset to return back in the FetchDataInfo when the conditions didn't met? 1. In current approach, the same fetch-offset gets returned back in the FetchDataInfo 2. In the suggested approach, the next-offset/log-end-offset gets returned back in the FetchDataInfo I'm not sure which one is correct. Please suggest. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608831211 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) { Review Comment: When applying this change, the newly added LocalLog#testWhenFetchOffsetHigherThanMaxOffset test fails for the case-3 and case-4. (ie) Which offset to return back in the FetchDataInfo when the conditions didn't met? 1. In current approach, the same fetch-offset gets returned back in the FetchDataInfo 2. In the suggested approach, the next-offset/log-end-offset gets returned back in the FetchDataInfo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16626: Lazily convert subscribed topic names to topic ids [kafka]
dajac commented on code in PR #15970: URL: https://github.com/apache/kafka/pull/15970#discussion_r1608815939 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java: ## @@ -53,6 +55,13 @@ public class OptimizedUniformAssignmentBuilderTest { private final String memberB = "B"; private final String memberC = "C"; +private final TopicsImage topicsImage = new MetadataImageBuilder() Review Comment: Do we still need this one? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicIds.java: ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; + +/** + * TopicIds is initialized with topic names (String) but exposes a Set of topic ids (Uuid) to the + * user and performs the conversion lazily with TopicsImage. + */ +public class TopicIds implements Set { +private final Set topicNames; +private final TopicsImage image; + +public TopicIds( +Set topicNames, +TopicsImage image +) { +this.topicNames = Objects.requireNonNull(topicNames); +this.image = Objects.requireNonNull(image); +} + +@Override +public int size() { +return topicNames.size(); +} + +@Override +public boolean isEmpty() { +return topicNames.isEmpty(); +} + +@Override +public boolean contains(Object o) { +if (o instanceof Uuid) { +Uuid topicId = (Uuid) o; +TopicImage topicImage = image.getTopic(topicId); +if (topicImage == null) return false; +return topicNames.contains(topicImage.name()); +} +return false; +} + +private static class TopicIdIterator implements Iterator { +final Iterator iterator; +final TopicsImage image; +private Uuid next = null; + +private TopicIdIterator( +Iterator iterator, +TopicsImage image +) { +this.iterator = Objects.requireNonNull(iterator); +this.image = Objects.requireNonNull(image); +} + +@Override +public boolean hasNext() { +if (next != null) return true; +Uuid result = null; +do { +if (!iterator.hasNext()) { +return false; +} +String next = iterator.next(); +TopicImage topicImage = image.getTopic(next); +if (topicImage != null) { +result = topicImage.id(); +} +} while (result == null); +next = result; +return true; +} + +@Override +public Uuid next() { +if (!hasNext()) throw new NoSuchElementException(); +Uuid result = next; +next = null; +return result; +} +} + +@Override +public Iterator iterator() { +return new TopicIdIterator(topicNames.iterator(), image); +} + +@Override +public Object[] toArray() { +throw new UnsupportedOperationException(); +} + +@Override +public T[] toArray(T[] a) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean add(Uuid o) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean remove(Object o) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean addAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public void clear() { +throw new UnsupportedOperationException(); +} + +@Override +public boolean removeAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608823231 ## storage/src/test/java/org/apache/kafka/tiered/storage/actions/EraseBrokerStorageAction.java: ## @@ -19,24 +19,35 @@ import org.apache.kafka.tiered.storage.TieredStorageTestAction; import org.apache.kafka.tiered.storage.TieredStorageTestContext; +import java.io.FilenameFilter; import java.io.IOException; import java.io.PrintStream; public final class EraseBrokerStorageAction implements TieredStorageTestAction { private final int brokerId; +private final FilenameFilter filenameFilter; +private final boolean isStopped; public EraseBrokerStorageAction(int brokerId) { +this(brokerId, (dir, name) -> true, false); +} + +public EraseBrokerStorageAction(int brokerId, +FilenameFilter filenameFilter, +boolean isStopped) { this.brokerId = brokerId; +this.filenameFilter = filenameFilter; +this.isStopped = isStopped; } @Override public void doExecute(TieredStorageTestContext context) throws IOException { -context.eraseBrokerStorage(brokerId); +context.eraseBrokerStorage(brokerId, filenameFilter, isStopped); } @Override public void describe(PrintStream output) { -output.println("erase-broker-storage: " + brokerId); +output.println("erase-broker-storage: " + brokerId + ", isStopped: " + isStopped); Review Comment: `filenameFilter` is a lambda expression, it results to the object name in the log so omitted it from the output. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16804: Replace archivesBaseName with archivesName. [kafka]
gharris1727 commented on PR #16016: URL: https://github.com/apache/kafka/pull/16016#issuecomment-2123252147 I verified that this change resolves the relevant build warnings, and produces an identical published result as the current trunk implementation. Thank you @frankvicky for picking this up! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16804: Replace archivesBaseName with archivesName. [kafka]
gharris1727 commented on code in PR #16016: URL: https://github.com/apache/kafka/pull/16016#discussion_r1608820947 ## build.gradle: ## @@ -341,7 +341,7 @@ subprojects { artifact task } -artifactId = archivesBaseName +artifactId = project.extensions.findByType(BasePluginExtension)?.archivesName?.get() Review Comment: I think this has the same effect, and looks a bit better. The "safe navigation operator" `?` also makes me think that this could be null, when it really should never be null, and the publishing plugin can't handle a null. ```suggestion artifactId = base.archivesName.get() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15242) FixedKeyProcessor testing is unusable
[ https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-15242. - Assignee: (was: Alexander Aghili) Resolution: Duplicate > FixedKeyProcessor testing is unusable > - > > Key: KAFKA-15242 > URL: https://issues.apache.org/jira/browse/KAFKA-15242 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Zlstibor Veljkovic >Priority: Major > > Using mock processor context to get the forwarded message doesn't work. > Also there is not a well documented way for testing FixedKeyProcessors. > Please see the repo at [https://github.com/zveljkovic/kafka-repro] > but most important piece is test file with runtime and compile time errors: > [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15258) Consider moving MockAdminClient to the public API
[ https://issues.apache.org/jira/browse/KAFKA-15258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Muralidhar Basani reassigned KAFKA-15258: - Assignee: Muralidhar Basani > Consider moving MockAdminClient to the public API > - > > Key: KAFKA-15258 > URL: https://issues.apache.org/jira/browse/KAFKA-15258 > Project: Kafka > Issue Type: Task > Components: admin >Reporter: Mickael Maison >Assignee: Muralidhar Basani >Priority: Major > Labels: need-kip > > MockConsumer and MockProducer are part of the public API. They are useful for > developers wanting to test their applications. On the other hand > MockAdminClient is not part of the public API (it's under test). We should > consider moving it to src so users can also easily test applications that > depend on Admin. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Move Throttler to storage module [kafka]
chia7712 commented on code in PR #16023: URL: https://github.com/apache/kafka/pull/16023#discussion_r1608736058 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -109,12 +110,12 @@ class LogCleaner(initialConfig: CleanerConfig, private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel) /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ - private[log] val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, -checkIntervalMs = 300, -throttleDown = true, -"cleaner-io", -"bytes", -time = time) + private[log] val throttler = new Throttler(config.maxIoBytesPerSecond, + 300, + true, Review Comment: It seems `throttleDown` is always true in production. Maybe we can remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]
apourchet opened a new pull request, #16024: URL: https://github.com/apache/kafka/pull/16024 For task assignment purposes, the user needs to have a set of information available for each topic partition affecting the desired tasks. This PR introduces a new interface for a read-only container class that allows all the important and relevant information to be found in one place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on PR #15640: URL: https://github.com/apache/kafka/pull/15640#issuecomment-2123163240 @lianetm @cadonna—The latest batch of feedback has been addressed. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1608732398 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1848,6 +1821,40 @@ private void subscribeInternal(Collection topics, Optional processor) { Review Comment: Done. That's much better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1608730407 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * {@code CompletableEventReaper} is responsible for tracking {@link CompletableEvent time-bound events} and removing + * any that exceed their {@link CompletableEvent#deadlineMs() deadline} (unless they've already completed). This + * mechanism is used by the {@link AsyncKafkaConsumer} to enforce the timeout provided by the user in its API + * calls (e.g. {@link AsyncKafkaConsumer#commitSync(Duration)}). + */ +public class CompletableEventReaper { + +private final Logger log; + +/** + * List of tracked events that are candidates for expiration. + */ +private final List> tracked; + +public CompletableEventReaper(LogContext logContext) { +this.log = logContext.logger(CompletableEventReaper.class); +this.tracked = new ArrayList<>(); +} + +/** + * Adds a new {@link CompletableEvent event} to track for later completion/expiration. + * + * @param event Event to track + */ +public void add(CompletableEvent event) { +tracked.add(Objects.requireNonNull(event, "Event to track must be non-null")); +} + +/** + * This method performs a two-step process to "complete" {@link CompletableEvent events} that have either expired + * or completed normally: + * + * + * + * For each tracked event which has exceeded its {@link CompletableEvent#deadlineMs() deadline}, an + * instance of {@link TimeoutException} is created and passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * + * For each tracked event of which its {@link CompletableEvent#future() future} is already in the + * {@link CompletableFuture#isDone() done} state, it will be removed from the list of tracked events. + * + * + * + * + * + * This method should be called at regular intervals, based upon the needs of the resource that owns the reaper. + * + * @param currentTimeMs Current time with which to compare against the + * {@link CompletableEvent#deadlineMs() expiration time} + */ +public void reap(long currentTimeMs) { +Consumer> expireEvent = event -> { +long pastDueMs = currentTimeMs - event.deadlineMs(); +TimeoutException error = new TimeoutException(String.format("%s was %s ms past its expiration of %s", event.getClass().getSimpleName(), pastDueMs, event.deadlineMs())); + +if (event.future().completeExceptionally(error)) { +log.debug("Event {} completed exceptionally since its expiration of {} passed {} ms ago", event, event.deadlineMs(), pastDueMs); +} else { +log.trace("Event {} not completed exceptionally since it was previously completed", event); +} +}; + +// First, complete (exceptionally) any events that have passed their deadline AND aren't already complete. +tracked.stream() +.filter(e -> !e.future().isDone()) +.filter(e -> currentTimeMs > e.deadlineMs()) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1607381004 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -92,7 +92,14 @@ class DelayedFetch( // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { +// If we don't know the position of the offset on log segments, just pessimistically assume that we +// only gained 1 byte when fetchOffset < endOffset, otherwise do nothing. This can happen when the +// high-watermark is stale, but should be rare. +if (fetchOffset.messageOffset < endOffset.messageOffset) { Review Comment: We can organize the code a bit clearer. I am thinking of sth like the following. ``` if (fetchOffset.messageOffset > endOffset.messageOffset) { // Case F, this can happen when the new fetch operation is on a truncated leader debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") return forceComplete() } else if (fetchOffset.messageOffset < endOffset.messageOffset) { if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { // If we don't know the position of the offset on log segments, just pessimistically assume that we // only gained 1 byte when fetchOffset < endOffset, otherwise do nothing. This can happen when the // high-watermark is stale, but should be rare. accumulatedSize += 1 } else if (fetchOffset.onOlderSegment(endOffset)) { // Case F, this can happen when the fetch operation is falling behind the current segment // or the partition has just rolled a new segment debug(s"Satisfying fetch $this immediately since it is fetching older segments.") // We will not force complete the fetch request if a replica should be throttled. if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) return forceComplete() } else if (fetchOffset.onSameSegment(endOffset)) { // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) accumulatedSize += bytesAvailable } ``` ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithInvalidHighWatermark(endOffset: Long): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// Note that the high-watermark does not contains the complete metadata Review Comment: does not contains => does not contain