[jira] [Updated] (KAFKA-15867) Should ConsumerNetworkThread wrap the exception and notify the polling thread?
[ https://issues.apache.org/jira/browse/KAFKA-15867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15867: --- Parent: KAFKA-14246 Issue Type: Sub-task (was: Bug) > Should ConsumerNetworkThread wrap the exception and notify the polling thread? > -- > > Key: KAFKA-15867 > URL: https://issues.apache.org/jira/browse/KAFKA-15867 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Priority: Minor > > The ConsumerNetworkThread runs a tight loop infinitely. However, when > encountering an unexpected exception, it logs an error and continues. > > I think this might not be ideal because user can run blind for a long time > before discovering there's something wrong with the code; so I believe we > should propagate the throwable back to the polling thread. > > cc [~lucasbru] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15867) Should ConsumerNetworkThread wrap the exception and notify the polling thread?
Philip Nee created KAFKA-15867: -- Summary: Should ConsumerNetworkThread wrap the exception and notify the polling thread? Key: KAFKA-15867 URL: https://issues.apache.org/jira/browse/KAFKA-15867 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee The ConsumerNetworkThread runs a tight loop infinitely. However, when encountering an unexpected exception, it logs an error and continues. I think this might not be ideal because user can run blind for a long time before discovering there's something wrong with the code; so I believe we should propagate the throwable back to the polling thread. cc [~lucasbru] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
philipnee commented on code in PR #14710: URL: https://github.com/apache/kafka/pull/14710#discussion_r1400134639 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -353,7 +380,8 @@ private void testRetriable(final CommitRequestManager commitRequestManger, final List>> futures) { futures.forEach(f -> assertFalse(f.isDone())); -time.sleep(500); +// The manager should backoff for 100ms +time.sleep(100); Review Comment: changed to 100 because we are now using jitter = 0, so the backoff during testing is deterministic and should always be 100ms -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
philipnee commented on PR #14710: URL: https://github.com/apache/kafka/pull/14710#issuecomment-1820383565 Hey @lucasbru - Thanks again for taking the time reviewing the PR. I've made some cleanup and changes according to your comments. Let me know if you have further questions. I've rebased the trunk so worry about messing up the commits a bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
philipnee commented on code in PR #14710: URL: https://github.com/apache/kafka/pull/14710#discussion_r1400134639 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -353,7 +380,8 @@ private void testRetriable(final CommitRequestManager commitRequestManger, final List>> futures) { futures.forEach(f -> assertFalse(f.isDone())); -time.sleep(500); +// The manager should backoff for 100ms +time.sleep(100); Review Comment: changed to 100 because we are now using jitter = 0, so the next backoff should always be 100ms ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -373,7 +401,6 @@ private static Stream offsetCommitExceptionSupplier() { Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false), Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, true), Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true), -Arguments.of(Errors.NOT_COORDINATOR, true), Review Comment: Both are removed due to duplication, similar to the error below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15866) Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState
Philip Nee created KAFKA-15866: -- Summary: Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState Key: KAFKA-15866 URL: https://issues.apache.org/jira/browse/KAFKA-15866 Project: Kafka Issue Type: Improvement Reporter: Philip Nee The current OffsetFetchRequestState error handling uses nested if-else, which is quite different, stylistically, to the OffsetCommitRequestState using a switch statment. The latter is a bit more readable so we should refactor the error handling using the same style to improve readability. A minor point: Some of the error handling seems inconsistent with the commit. The logic was from the current implementation, so we should also review all the error handling. For example: somehow the current logic doesn't mark the coordinator unavailable when receiving COORDINATOR_NOT_AVAILABLE -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15866) Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState
[ https://issues.apache.org/jira/browse/KAFKA-15866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15866: --- Parent: KAFKA-14246 Issue Type: Sub-task (was: Improvement) > Refactor OffsetFetchRequestState Error handling to be more consistent with > OffsetCommitRequestState > --- > > Key: KAFKA-15866 > URL: https://issues.apache.org/jira/browse/KAFKA-15866 > Project: Kafka > Issue Type: Sub-task >Reporter: Philip Nee >Priority: Minor > > The current OffsetFetchRequestState error handling uses nested if-else, which > is quite different, stylistically, to the OffsetCommitRequestState using a > switch statment. The latter is a bit more readable so we should refactor the > error handling using the same style to improve readability. > > A minor point: Some of the error handling seems inconsistent with the commit. > The logic was from the current implementation, so we should also review all > the error handling. For example: somehow the current logic doesn't mark the > coordinator unavailable when receiving COORDINATOR_NOT_AVAILABLE -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback
[ https://issues.apache.org/jira/browse/KAFKA-15865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788298#comment-17788298 ] Philip Nee commented on KAFKA-15865: [~lucasbru] [~cadonna] - Would either of you be interested in taking this on? This is a pretty simple task. > Ensure consumer.poll() execute autocommit callback > -- > > Key: KAFKA-15865 > URL: https://issues.apache.org/jira/browse/KAFKA-15865 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > > When the network thread completes autocommits, we need to send a > message/event to the application to notify the thread to execute the > callback. In KAFKA-15327, the network thread sends a > AutoCommitCompletionBackgroundEvent to the polling thread. The polling > thread should trigger the OffsetCommitCallback upon receiving it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
philipnee commented on code in PR #14710: URL: https://github.com/apache/kafka/pull/14710#discussion_r1400074015 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -257,10 +257,77 @@ private void closeInternal(final Duration timeout) { void cleanup() { log.trace("Closing the consumer network thread"); Timer timer = time.timer(closeTimeout); +waitForClosingTasks(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); closeQuietly(requestManagers, "request managers"); closeQuietly(networkClientDelegate, "network client delegate"); closeQuietly(applicationEventProcessor, "application event processor"); log.debug("Closed the consumer network thread"); } + +/** + * We need to autocommit before shutting down the consumer. The method needs to first connect to the coordinator + * node to construct the closing requests. Then wait for all closing requests to finish before returning. The + * method is bounded by a closing timer. We will continue closing down the consumer if the requests cannot be + * completed in time. + */ +// Visible for testing +void waitForClosingTasks(final Timer timer) { Review Comment: You are right. I think the original code is actually less confusing because sending auto commits needs to happen before closing down the `ConsumerCoordinator`. Now these tasks (closingTasks here) are the operations that need to happen before closing the network thread (Because we are shutting down the consumer). I wonder if I could make it less confusing by renaming the method. See the changes there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
philipnee commented on code in PR #14710: URL: https://github.com/apache/kafka/pull/14710#discussion_r1400115792 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -454,6 +491,17 @@ public OffsetFetchRequestState(final Set partitions, this.future = new CompletableFuture<>(); } +public OffsetFetchRequestState(final Set partitions, + final GroupState.Generation generation, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final double jitter) { +super(logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, 2, retryBackoffMaxMs, jitter); Review Comment: @lucasbru - Just FYI: I added this so that exponential backoff during testing doesn't have jitter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
philipnee commented on code in PR #14710: URL: https://github.com/apache/kafka/pull/14710#discussion_r1400064245 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -219,4 +219,14 @@ private void onResponse( public Optional coordinator() { return Optional.ofNullable(this.coordinator); } + +@Override +public NetworkClientDelegate.PollResult pollOnClose() { Review Comment: It was a hack to make sure poll can always send a find coordinator request. Giving it a second thought, I think just calling poll is fine because if there is an inflight request, we should poll the network client and wait. If we just sent a failed one, we should backoff and then resend. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect
[ https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788233#comment-17788233 ] Phuc Hong Tran edited comment on KAFKA-15556 at 11/21/23 6:23 AM: -- Hi [~kirktrue], just to clarify, with this Jira we want to check if we can remove {{networkClientDelegate.isUnavailable}}, {{networkClientDelegate.maybeThrowAuthFailure}} in {{FetchRequestManager}} and {{networkClientDelegate.tryConnect}} in {{OffsetsRequestManager}} with direct call to NetworkClientUtils version of those functions instead? was (Author: JIRAUSER301295): Hi [~kirktrue], just to clarify, with this Jira we want to remove "networkClientDelegate.isUnavailable", "networkClientDelegate.maybeThrowAuthFailure" in FetchRequestManager and "networkClientDelegate.tryConnect" in {{OffsetsRequestManager}} with direct call to NetworkClientUtils version of those functions instead? > Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, > and tryConnect > - > > Key: KAFKA-15556 > URL: https://issues.apache.org/jira/browse/KAFKA-15556 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor > > The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to > handle networking details in a more centralized way. However, in order to > reuse code between the existing {{KafkaConsumer}} and the new > {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the > {{NetworkClientDelegate}} capitulated and -stole- copied three methods from > {{ConsumerNetworkClient}} related to detecting node status: > # {{isUnavailable}} > # {{maybeThrowAuthFailure}} > # {{tryConnect}} > Unfortunately, these have found their way into the {{FetchRequestManager}} > and {{OffsetsRequestManager}} implementations. We should review if we can > clean up—or even remove—this leaky abstraction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
philipnee commented on code in PR #14710: URL: https://github.com/apache/kafka/pull/14710#discussion_r1400074015 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -257,10 +257,77 @@ private void closeInternal(final Duration timeout) { void cleanup() { log.trace("Closing the consumer network thread"); Timer timer = time.timer(closeTimeout); +waitForClosingTasks(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); closeQuietly(requestManagers, "request managers"); closeQuietly(networkClientDelegate, "network client delegate"); closeQuietly(applicationEventProcessor, "application event processor"); log.debug("Closed the consumer network thread"); } + +/** + * We need to autocommit before shutting down the consumer. The method needs to first connect to the coordinator + * node to construct the closing requests. Then wait for all closing requests to finish before returning. The + * method is bounded by a closing timer. We will continue closing down the consumer if the requests cannot be + * completed in time. + */ +// Visible for testing +void waitForClosingTasks(final Timer timer) { Review Comment: You are right. I think the original code is actually less confusing because sending auto commits needs to happen before closing down the `ConsumerCoordinator`. Now these tasks (closingTasks here) are the operations that need to happen before closing the network thread (Because we are shutting down the consumer). I wonder if structuring the runOnClose this way would make the code more readable ``` void runOnClose(Timer timer) { maybeAutoCommitAndLeaveGroup(timer); ... } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback
[ https://issues.apache.org/jira/browse/KAFKA-15865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15865: --- Labels: consumer-threading-refactor kip-848-preview (was: ) > Ensure consumer.poll() execute autocommit callback > -- > > Key: KAFKA-15865 > URL: https://issues.apache.org/jira/browse/KAFKA-15865 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > > When the network thread completes autocommits, we need to send a > message/event to the application to notify the thread to execute the > callback. In KAFKA-15327, the network thread sends a > AutoCommitCompletionBackgroundEvent to the polling thread. The polling > thread should trigger the OffsetCommitCallback upon receiving it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback
[ https://issues.apache.org/jira/browse/KAFKA-15865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15865: --- Parent: KAFKA-14246 Issue Type: Sub-task (was: Bug) > Ensure consumer.poll() execute autocommit callback > -- > > Key: KAFKA-15865 > URL: https://issues.apache.org/jira/browse/KAFKA-15865 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > When the network thread completes autocommits, we need to send a > message/event to the application to notify the thread to execute the > callback. In KAFKA-15327, the network thread sends a > AutoCommitCompletionBackgroundEvent to the polling thread. The polling > thread should trigger the OffsetCommitCallback upon receiving it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
philipnee commented on code in PR #14710: URL: https://github.com/apache/kafka/pull/14710#discussion_r1400070106 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AutoCommitCompletionBackgroundEvent.java: ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class AutoCommitCompletionBackgroundEvent extends BackgroundEvent { Review Comment: Thanks for pointing this out. Would it be possible to followup with a separated PR? I filed a jira here: https://issues.apache.org/jira/browse/KAFKA-15865 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback
Philip Nee created KAFKA-15865: -- Summary: Ensure consumer.poll() execute autocommit callback Key: KAFKA-15865 URL: https://issues.apache.org/jira/browse/KAFKA-15865 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee When the network thread completes autocommits, we need to send a message/event to the application to notify the thread to execute the callback. In KAFKA-15327, the network thread sends a AutoCommitCompletionBackgroundEvent to the polling thread. The polling thread should trigger the OffsetCommitCallback upon receiving it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
philipnee commented on code in PR #14710: URL: https://github.com/apache/kafka/pull/14710#discussion_r1400064652 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -180,12 +188,47 @@ public void maybeAutoCommit(final Map offsets autocommit.setInflightCommitStatus(true); } +/** + * The consumer needs to send an auto commit during the shutdown if autocommit is enabled. + */ +Optional maybeAutoCommit() { +if (!autoCommitState.isPresent()) { +return Optional.empty(); +} + +OffsetCommitRequestState request = pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter); + request.future.whenComplete(autoCommitCallback(subscriptions.allConsumed())); +return Optional.of(request.toUnsentRequest()); +} + + // Visible for testing +CompletableFuture sendAutoCommit(final Map allConsumedOffsets) { Review Comment: changed to `private` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
philipnee commented on code in PR #14710: URL: https://github.com/apache/kafka/pull/14710#discussion_r1400064245 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -219,4 +219,14 @@ private void onResponse( public Optional coordinator() { return Optional.ofNullable(this.coordinator); } + +@Override +public NetworkClientDelegate.PollResult pollOnClose() { Review Comment: Deleted. I forgot to remove when removing the autocommit in the runOnClose... (sorry) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
satishd commented on code in PR #14766: URL: https://github.com/apache/kafka/pull/14766#discussion_r1399008945 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,25 +1425,71 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { -def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) - - // Check not to delete segments which are not yet copied to tiered storage if remote log is enabled. - (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage)) && -// We don't delete segments with offsets at or beyond the high watermark to ensure that the log start -// offset can never exceed it. -highWatermark >= upperBoundOffset && -predicate(segment, nextSegmentOpt) -} lock synchronized { - val deletable = localLog.deletableSegments(shouldDelete) + val deletable = deletableSegments(predicate) if (deletable.nonEmpty) deleteSegments(deletable, reason) else 0 } } + /** + * Find segments starting from the oldest until the user-supplied predicate is false. + * A final segment that is empty will never be returned. + * + * @param predicate A function that takes in a candidate log segment, the next higher segment + * (if there is one). It returns true iff the segment is deletable. + * @return the segments ready to be deleted + */ + private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { +def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = { + // Segments are eligible for deletion when: + //1. they are uploaded to the remote storage + if (remoteLogEnabled()) { +upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage + } else { +true + } +} + +if (localLog.segments.isEmpty) { + Seq.empty +} else { + val deletable = ArrayBuffer.empty[LogSegment] + val segmentsIterator = localLog.segments.values.iterator + var segmentOpt = nextOption(segmentsIterator) + var shouldRoll = false + while (segmentOpt.isDefined) { +val segment = segmentOpt.get +val nextSegmentOpt = nextOption(segmentsIterator) +val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0 +val upperBoundOffset = if (nextSegmentOpt.nonEmpty) nextSegmentOpt.get.baseOffset() else logEndOffset +// We don't delete segments with offsets at or beyond the high watermark to ensure that the log start +// offset can never exceed it. +val predicateResult = highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt) + +// Roll the active segment when it breaches the configured retention policy. The rolled segment will be +// eligible for deletion and gets removed in the next iteration. +if (predicateResult && !isLastSegmentAndEmpty && remoteLogEnabled() && nextSegmentOpt.isEmpty) { Review Comment: Can we have the below condition which is easy to understand or some thing better? ``` predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty && segment.size > 0 // active segment is not empty ``` Another way is to keep the last segment's predicate result and its size and do the check after the while loop. ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +im
[jira] [Updated] (KAFKA-15864) Add more tests asserting the log-start-offset, local-log-start-offset, and HW/LSO/LEO in rolling over segments with tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15864: --- Description: Followup on the [comment|https://github.com/apache/kafka/pull/14766/files#r1395389551] > Add more tests asserting the log-start-offset, local-log-start-offset, and > HW/LSO/LEO in rolling over segments with tiered storage. > --- > > Key: KAFKA-15864 > URL: https://issues.apache.org/jira/browse/KAFKA-15864 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Satish Duggana >Priority: Major > Labels: tiered-storage > Fix For: 3.7.0 > > > Followup on the > [comment|https://github.com/apache/kafka/pull/14766/files#r1395389551] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
philipnee commented on code in PR #14710: URL: https://github.com/apache/kafka/pull/14710#discussion_r1400055966 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -90,14 +93,13 @@ public void run() { while (running) { try { runOnce(); -} catch (final WakeupException e) { -log.debug("WakeupException caught, consumer network thread won't be interrupted"); +} catch (final Throwable e) { +log.error("Unexpected error caught in consumer network thread", e); // swallow the wakeup exception to prevent killing the thread. Review Comment: Thanks, a question here: Should we actually swallow the exception or propagate the exception back to the application thread (wrap it with KafkaException via BackgroundEventHandler) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1820280356 Thanks @junrao for explaining the details. I have updated the PR and removed throttleMs from ClientMetricsManager. I have added a Jira to add respective throttling changes in QuotaManager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15864) Add more tests asserting the log-start-offset, local-log-start-offset, and HW/LSO/LEO in rolling over segments with tiered storage.
Satish Duggana created KAFKA-15864: -- Summary: Add more tests asserting the log-start-offset, local-log-start-offset, and HW/LSO/LEO in rolling over segments with tiered storage. Key: KAFKA-15864 URL: https://issues.apache.org/jira/browse/KAFKA-15864 Project: Kafka Issue Type: Improvement Components: core Reporter: Satish Duggana Fix For: 3.7.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan merged PR #14387: URL: https://github.com/apache/kafka/pull/14387 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jolshan commented on PR #14387: URL: https://github.com/apache/kafka/pull/14387#issuecomment-1820273369 Build looks good now. I will merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1400049044 ## core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala: ## @@ -103,9 +103,16 @@ class DynamicConfigPublisher( ) case CLIENT_METRICS => // Apply changes to client metrics subscription. - info(s"Updating client metrics subscription ${resource.name()} with new configuration : " + -toLoggableProps(resource, props).mkString(",")) - dynamicConfigHandlers(ConfigType.ClientMetrics).processConfigChanges(resource.name(), props) + dynamicConfigHandlers.get(ConfigType.ClientMetrics).foreach(metricsConfigHandler => +try { + info(s"Updating client metrics ${resource.name()} with new configuration : " + +toLoggableProps(resource, props).mkString(",")) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1400048884 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -16,31 +16,421 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * Handles client telemetry metrics requests/responses, subscriptions and instance information. */ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); -private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; + +private final Cache clientInstanceCache; +private final Map subscriptionMap; +private final KafkaConfig config; +private final Time time; + +// The latest subscription version is used to determine if subscription has changed and needs +// to re-evaluate the client instance subscription id as per changed subscriptions. +private final AtomicInteger subscriptionUpdateVersion; -public static ClientMetricsManager instance() { -return INSTANCE; +public ClientMetricsManager(KafkaConfig config, Time time) { +this.subscriptionMap = new ConcurrentHashMap<>(); +this.subscriptionUpdateVersion = new AtomicInteger(0); +this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +this.config = config; +this.time = time; } public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// Validate the subscription properties. +ClientMetricsConfigs.validate(subscriptionName, properties); +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +this.subscriptionUpdateVersion.incrementAndGet(); +} +return; +} + +updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsReq
[jira] [Created] (KAFKA-15863) Handle push telemetry throttling with quota manager
Apoorv Mittal created KAFKA-15863: - Summary: Handle push telemetry throttling with quota manager Key: KAFKA-15863 URL: https://issues.apache.org/jira/browse/KAFKA-15863 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal Details: https://github.com/apache/kafka/pull/14699#discussion_r1399714279 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]
jeffkbkim commented on code in PR #14481: URL: https://github.com/apache/kafka/pull/14481#discussion_r1400044251 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java: ## @@ -14,17 +14,891 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.coordinator.group.assignor; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * The general uniform assignment builder is used to generate the target assignment for a consumer group with + * at least one of its members subscribed to a different set of topics. + * + * Assignments are done according to the following principles: + * + * Balance: Ensure partitions are distributed equally among all members. + *The difference in assignments sizes between any two members + *should not exceed one partition. + * Rack Matching:When feasible, aim to assign partitions to members + *located on the same rack thus avoiding cross-zone traffic. + * Stickiness: Minimize partition movements among members by retaining + *as much of the existing assignment as possible. + * + * This assignment builder prioritizes the above properties in the following order: + * Balance > Rack Matching > Stickiness. + */ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); +/** + * The member metadata obtained from the assignment specification. + */ +private final Map members; + +/** + * The topic and partition metadata describer. + */ +private final SubscribedTopicDescriber subscribedTopicDescriber; + +/** + * The list of all the topic Ids that the consumer group is subscribed to. + */ +private final Set subscriptionIds; + +/** + * Rack information. + */ +private final RackInfo rackInfo; + +/** + * List of subscribed members for each topic. + */ +private final Map> membersPerTopic; + +/** + * The partitions that still need to be assigned. + */ +private final Set unassignedPartitions; + +/** + * All the partitions that have been retained from the existing assignment. + */ +private final Set assignedStickyPartitions; + +/** + * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + */ +private final AssignmentManager assignmentManager; + +/** + * List of all the members sorted by their respective assignment sizes. + */ +private final TreeSet sortedMembersByAssignmentSize; + +/** + * Tracks the owner of each partition in the existing assignment on the client. + * + * Only populated when rack aware strategy is used. + * Contains partitions that weren't retained due to a rack mismatch. + */ +private final Map currentPartitionOwners; + +/** + * Tracks the owner of each partition in the target assignment. + */ +private final Map partitionOwnerInTargetAssignment; + +/** + * Handles all operations related to partition movements during a reassignment for balancing the target assignment. + */ +private PartitionMovements partitionMovements; + +/** + * The new assignment that will be returned. + */ +private final Map targetAssignment; + +public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.members = assignmentSpec.members(); +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscriptionIds = assignmentSpec.members().values().stream() +.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream()) +.peek(topicId -> { +int partitionCount = subscribedTopicDescriber.numPartitions(topicId); +if (partitionCount == -1) { +throw new PartitionAssignorException( +"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." +); +} +}) +.collect(Collectors.toSet()); +this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); +
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1400045702 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -16,31 +16,420 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; Review Comment: As I received merge conflict in the current PR hence merged the upstream changes which required to move the classes to write package. I have done this in current PR itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]
jeffkbkim commented on code in PR #14481: URL: https://github.com/apache/kafka/pull/14481#discussion_r1400045359 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java: ## @@ -14,17 +14,891 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.coordinator.group.assignor; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * The general uniform assignment builder is used to generate the target assignment for a consumer group with + * at least one of its members subscribed to a different set of topics. + * + * Assignments are done according to the following principles: + * + * Balance: Ensure partitions are distributed equally among all members. + *The difference in assignments sizes between any two members + *should not exceed one partition. + * Rack Matching:When feasible, aim to assign partitions to members + *located on the same rack thus avoiding cross-zone traffic. + * Stickiness: Minimize partition movements among members by retaining + *as much of the existing assignment as possible. + * + * This assignment builder prioritizes the above properties in the following order: + * Balance > Rack Matching > Stickiness. + */ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); +/** + * The member metadata obtained from the assignment specification. + */ +private final Map members; + +/** + * The topic and partition metadata describer. + */ +private final SubscribedTopicDescriber subscribedTopicDescriber; + +/** + * The list of all the topic Ids that the consumer group is subscribed to. + */ +private final Set subscriptionIds; + +/** + * Rack information. + */ +private final RackInfo rackInfo; + +/** + * List of subscribed members for each topic. + */ +private final Map> membersPerTopic; + +/** + * The partitions that still need to be assigned. + */ +private final Set unassignedPartitions; + +/** + * All the partitions that have been retained from the existing assignment. + */ +private final Set assignedStickyPartitions; + +/** + * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + */ +private final AssignmentManager assignmentManager; + +/** + * List of all the members sorted by their respective assignment sizes. + */ +private final TreeSet sortedMembersByAssignmentSize; + +/** + * Tracks the owner of each partition in the existing assignment on the client. + * + * Only populated when rack aware strategy is used. + * Contains partitions that weren't retained due to a rack mismatch. + */ +private final Map currentPartitionOwners; + +/** + * Tracks the owner of each partition in the target assignment. + */ +private final Map partitionOwnerInTargetAssignment; + +/** + * Handles all operations related to partition movements during a reassignment for balancing the target assignment. + */ +private PartitionMovements partitionMovements; + +/** + * The new assignment that will be returned. + */ +private final Map targetAssignment; + +public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.members = assignmentSpec.members(); +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscriptionIds = assignmentSpec.members().values().stream() +.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream()) +.peek(topicId -> { +int partitionCount = subscribedTopicDescriber.numPartitions(topicId); +if (partitionCount == -1) { +throw new PartitionAssignorException( +"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." +); +} +}) +.collect(Collectors.toSet()); +this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); +
Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]
jeffkbkim commented on code in PR #14481: URL: https://github.com/apache/kafka/pull/14481#discussion_r1400044251 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java: ## @@ -14,17 +14,891 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.coordinator.group.assignor; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * The general uniform assignment builder is used to generate the target assignment for a consumer group with + * at least one of its members subscribed to a different set of topics. + * + * Assignments are done according to the following principles: + * + * Balance: Ensure partitions are distributed equally among all members. + *The difference in assignments sizes between any two members + *should not exceed one partition. + * Rack Matching:When feasible, aim to assign partitions to members + *located on the same rack thus avoiding cross-zone traffic. + * Stickiness: Minimize partition movements among members by retaining + *as much of the existing assignment as possible. + * + * This assignment builder prioritizes the above properties in the following order: + * Balance > Rack Matching > Stickiness. + */ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); +/** + * The member metadata obtained from the assignment specification. + */ +private final Map members; + +/** + * The topic and partition metadata describer. + */ +private final SubscribedTopicDescriber subscribedTopicDescriber; + +/** + * The list of all the topic Ids that the consumer group is subscribed to. + */ +private final Set subscriptionIds; + +/** + * Rack information. + */ +private final RackInfo rackInfo; + +/** + * List of subscribed members for each topic. + */ +private final Map> membersPerTopic; + +/** + * The partitions that still need to be assigned. + */ +private final Set unassignedPartitions; + +/** + * All the partitions that have been retained from the existing assignment. + */ +private final Set assignedStickyPartitions; + +/** + * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + */ +private final AssignmentManager assignmentManager; + +/** + * List of all the members sorted by their respective assignment sizes. + */ +private final TreeSet sortedMembersByAssignmentSize; + +/** + * Tracks the owner of each partition in the existing assignment on the client. + * + * Only populated when rack aware strategy is used. + * Contains partitions that weren't retained due to a rack mismatch. + */ +private final Map currentPartitionOwners; + +/** + * Tracks the owner of each partition in the target assignment. + */ +private final Map partitionOwnerInTargetAssignment; + +/** + * Handles all operations related to partition movements during a reassignment for balancing the target assignment. + */ +private PartitionMovements partitionMovements; + +/** + * The new assignment that will be returned. + */ +private final Map targetAssignment; + +public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.members = assignmentSpec.members(); +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscriptionIds = assignmentSpec.members().values().stream() +.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream()) +.peek(topicId -> { +int partitionCount = subscribedTopicDescriber.numPartitions(topicId); +if (partitionCount == -1) { +throw new PartitionAssignorException( +"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." +); +} +}) +.collect(Collectors.toSet()); +this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); +
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1400042116 ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; Review Comment: Updated the comment in the ReplicationControlManager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1400041537 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -327,10 +333,12 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); -if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); +if (version >= 2 +&& (existing == null || request.previousBrokerEpoch() != existing.epoch()) +&& replicationControlManager != null) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1400041854 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1772,7 +1814,8 @@ void generateLeaderAndIsrUpdates(String context, // where there is an unclean leader election which chooses a leader from outside // the ISR. IntPredicate isAcceptableLeader = -r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r)); +r -> (r != brokerToRemove && r != brokerWithUncleanShutdown) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1400041780 ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.apache.kafka.metadata.Replicas.NONE; + +public class BrokersToElrs { +private final SnapshotRegistry snapshotRegistry; + +private final TimelineHashMap> elrMembers; Review Comment: Done. It is broker id -> TopicIdPartitions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15860: ControllerRegistration must be written out to the metadata image [kafka]
cmccabe opened a new pull request, #14807: URL: https://github.com/apache/kafka/pull/14807 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1400036881 ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -466,6 +466,10 @@ private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) { partition.lastKnownElr[0] != partition.leader)) { // Only update the last known leader when the first time the partition becomes leaderless. record.setLastKnownELR(Arrays.asList(partition.leader)); +} else if ((recordHasLeader(record) || partition.leader != NO_LEADER) Review Comment: Good catch! To improve the readability, revise to the following. `record.leader() > 0 || (partition.leader != NO_LEADER && record.leader() != NO_LEADER)` `(Will have a new leader) || (will not become leaderless)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]
apoorvmittal10 commented on PR #14632: URL: https://github.com/apache/kafka/pull/14632#issuecomment-1820231138 > Seems there is a compilation error on the last run: > > ``` > > [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14632/core/src/main/scala/kafka/server/DynamicConfig.scala:116:47: object ClientMetricsConfigs is not a member of package kafka.metrics > > [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14632/core/src/main/scala/kafka/server/DynamicConfig.scala:116:17: private val clientConfigs in object ClientMetrics is never used > ``` The change in dependent classes broke the PR build. I have resolved merge issue and triggered build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1400016403 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1356,6 +1373,78 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { +metadataCache match { + case _: ZkMetadataCache => +throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request") + case _ => +} +val kRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache] + +val describeTopicPartitionsRequest = request.body[DescribeTopicPartitionsRequest].data() +val topicSet = mutable.SortedSet[String]() +describeTopicPartitionsRequest.topics().forEach(topic => topicSet.add(topic.name())) +val topics = ListBuffer[String]().addAll(topicSet) + +val cursor = describeTopicPartitionsRequest.cursor() +val fetchAllTopics = topics.isEmpty +if (fetchAllTopics) { + kRaftMetadataCache.getAllTopics().foreach(topic => topics.append(topic)) Review Comment: But if you ask whether it is worth the effort to create the full set of underline structures to get an ordered list where we can just sort the topic list, I am not sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1400016403 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1356,6 +1373,78 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { +metadataCache match { + case _: ZkMetadataCache => +throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request") + case _ => +} +val kRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache] + +val describeTopicPartitionsRequest = request.body[DescribeTopicPartitionsRequest].data() +val topicSet = mutable.SortedSet[String]() +describeTopicPartitionsRequest.topics().forEach(topic => topicSet.add(topic.name())) +val topics = ListBuffer[String]().addAll(topicSet) + +val cursor = describeTopicPartitionsRequest.cursor() +val fetchAllTopics = topics.isEmpty +if (fetchAllTopics) { + kRaftMetadataCache.getAllTopics().foreach(topic => topics.append(topic)) Review Comment: But if you ask whether it is worth the effort to create the full set of underline structures to get an ordered list where we can just sort the list, I am not sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1400015238 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1356,6 +1373,78 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { +metadataCache match { + case _: ZkMetadataCache => +throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request") + case _ => +} +val kRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache] + +val describeTopicPartitionsRequest = request.body[DescribeTopicPartitionsRequest].data() +val topicSet = mutable.SortedSet[String]() +describeTopicPartitionsRequest.topics().forEach(topic => topicSet.add(topic.name())) +val topics = ListBuffer[String]().addAll(topicSet) + +val cursor = describeTopicPartitionsRequest.cursor() +val fetchAllTopics = topics.isEmpty +if (fetchAllTopics) { + kRaftMetadataCache.getAllTopics().foreach(topic => topics.append(topic)) Review Comment: In the fetch all path, no additional sort is required. I did not see a good way to convert Java list to a scala mutable list, so I did the copy. Use a mutable list for 2 reasons 1. It is easier to filter out the topics alphabetically ahead of the cursor topic 2. In the fetch all case, I think we should still include the cursor topic in the response if it does not exist. Mutable list make it easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1400011898 ## clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json: ## @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 74, + "type": "response", + "name": "DescribeTopicPartitionsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, +{ "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", "versions": "0+", + "about": "Each topic in the response.", "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", +"about": "The topic error, or 0 if there was no error." }, + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "0+", +"about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The topic id." }, + { "name": "IsInternal", "type": "bool", "versions": "0+", "default": "false", "ignorable": true, +"about": "True if the topic is internal." }, + { "name": "Partitions", "type": "[]DescribeTopicPartitionsResponsePartition", "versions": "0+", +"about": "Each partition in the topic.", "fields": [ +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The partition error, or 0 if there was no error." }, +{ "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, +{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the leader broker." }, +{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true, + "about": "The leader epoch of this partition." }, +{ "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId", + "about": "The set of all nodes that host this partition." }, +{ "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId", + "about": "The set of nodes that are in sync with the leader for this partition." }, +{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId", + "versions": "0+", "nullableVersions": "0+", + "about": "The new eligible leader replicas otherwise." }, +{ "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId", + "versions": "0+", "nullableVersions": "0+", + "about": "The last known ELR." }, +{ "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", "ignorable": true, "entityType": "brokerId", + "about": "The set of offline replicas of this partition." }]}, + { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", +"about": "32-bit bitfield to represent authorized operations for this topic." }] +}, +{ "name": "NextTopicPartition", "type": "Cursor", "versions": "0+", "nullableVersions": "0+", "default": "null", Review Comment: Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]
rreddy-22 commented on code in PR #14481: URL: https://github.com/apache/kafka/pull/14481#discussion_r1399989044 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java: ## @@ -14,17 +14,891 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.coordinator.group.assignor; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * The general uniform assignment builder is used to generate the target assignment for a consumer group with + * at least one of its members subscribed to a different set of topics. + * + * Assignments are done according to the following principles: + * + * Balance: Ensure partitions are distributed equally among all members. + *The difference in assignments sizes between any two members + *should not exceed one partition. + * Rack Matching:When feasible, aim to assign partitions to members + *located on the same rack thus avoiding cross-zone traffic. + * Stickiness: Minimize partition movements among members by retaining + *as much of the existing assignment as possible. + * + * This assignment builder prioritizes the above properties in the following order: + * Balance > Rack Matching > Stickiness. + */ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); +/** + * The member metadata obtained from the assignment specification. + */ +private final Map members; + +/** + * The topic and partition metadata describer. + */ +private final SubscribedTopicDescriber subscribedTopicDescriber; + +/** + * The list of all the topic Ids that the consumer group is subscribed to. + */ +private final Set subscriptionIds; + +/** + * Rack information. + */ +private final RackInfo rackInfo; + +/** + * List of subscribed members for each topic. + */ +private final Map> membersPerTopic; + +/** + * The partitions that still need to be assigned. + */ +private final Set unassignedPartitions; + +/** + * All the partitions that have been retained from the existing assignment. + */ +private final Set assignedStickyPartitions; + +/** + * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + */ +private final AssignmentManager assignmentManager; + +/** + * List of all the members sorted by their respective assignment sizes. + */ +private final TreeSet sortedMembersByAssignmentSize; + +/** + * Tracks the owner of each partition in the existing assignment on the client. + * + * Only populated when rack aware strategy is used. + * Contains partitions that weren't retained due to a rack mismatch. + */ +private final Map currentPartitionOwners; + +/** + * Tracks the owner of each partition in the target assignment. + */ +private final Map partitionOwnerInTargetAssignment; + +/** + * Handles all operations related to partition movements during a reassignment for balancing the target assignment. + */ +private PartitionMovements partitionMovements; + +/** + * The new assignment that will be returned. + */ +private final Map targetAssignment; + +public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.members = assignmentSpec.members(); +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscriptionIds = assignmentSpec.members().values().stream() +.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream()) +.peek(topicId -> { +int partitionCount = subscribedTopicDescriber.numPartitions(topicId); +if (partitionCount == -1) { +throw new PartitionAssignorException( +"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." +); +} +}) +.collect(Collectors.toSet()); +this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); +
Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]
rreddy-22 commented on code in PR #14481: URL: https://github.com/apache/kafka/pull/14481#discussion_r1399988448 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java: ## @@ -14,17 +14,891 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.coordinator.group.assignor; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * The general uniform assignment builder is used to generate the target assignment for a consumer group with + * at least one of its members subscribed to a different set of topics. + * + * Assignments are done according to the following principles: + * + * Balance: Ensure partitions are distributed equally among all members. + *The difference in assignments sizes between any two members + *should not exceed one partition. + * Rack Matching:When feasible, aim to assign partitions to members + *located on the same rack thus avoiding cross-zone traffic. + * Stickiness: Minimize partition movements among members by retaining + *as much of the existing assignment as possible. + * + * This assignment builder prioritizes the above properties in the following order: + * Balance > Rack Matching > Stickiness. + */ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); +/** + * The member metadata obtained from the assignment specification. + */ +private final Map members; + +/** + * The topic and partition metadata describer. + */ +private final SubscribedTopicDescriber subscribedTopicDescriber; + +/** + * The list of all the topic Ids that the consumer group is subscribed to. + */ +private final Set subscriptionIds; + +/** + * Rack information. + */ +private final RackInfo rackInfo; + +/** + * List of subscribed members for each topic. + */ +private final Map> membersPerTopic; + +/** + * The partitions that still need to be assigned. + */ +private final Set unassignedPartitions; + +/** + * All the partitions that have been retained from the existing assignment. + */ +private final Set assignedStickyPartitions; + +/** + * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + */ +private final AssignmentManager assignmentManager; + +/** + * List of all the members sorted by their respective assignment sizes. + */ +private final TreeSet sortedMembersByAssignmentSize; + +/** + * Tracks the owner of each partition in the existing assignment on the client. + * + * Only populated when rack aware strategy is used. + * Contains partitions that weren't retained due to a rack mismatch. + */ +private final Map currentPartitionOwners; + +/** + * Tracks the owner of each partition in the target assignment. + */ +private final Map partitionOwnerInTargetAssignment; + +/** + * Handles all operations related to partition movements during a reassignment for balancing the target assignment. + */ +private PartitionMovements partitionMovements; + +/** + * The new assignment that will be returned. + */ +private final Map targetAssignment; + +public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.members = assignmentSpec.members(); +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscriptionIds = assignmentSpec.members().values().stream() +.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream()) +.peek(topicId -> { +int partitionCount = subscribedTopicDescriber.numPartitions(topicId); +if (partitionCount == -1) { +throw new PartitionAssignorException( +"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." +); +} +}) +.collect(Collectors.toSet()); +this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); +
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1399973308 ## streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java: ## @@ -91,11 +91,13 @@ private KeyValueStore maybeWrapLogging(final KeyValueStore, Bytes, byte[]> Review Comment: ah, ok, that makes sense. In that case yeah, can you just make a quick pass and update the calls in this? thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect
[ https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788233#comment-17788233 ] Phuc Hong Tran commented on KAFKA-15556: Hi [~kirktrue], just to clarify, with this Jira we want to remove "networkClientDelegate.isUnavailable", "networkClientDelegate.maybeThrowAuthFailure" in FetchRequestManager and "networkClientDelegate.tryConnect" in {{OffsetsRequestManager}} with direct call to NetworkClientUtils version of those functions instead? > Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, > and tryConnect > - > > Key: KAFKA-15556 > URL: https://issues.apache.org/jira/browse/KAFKA-15556 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor > > The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to > handle networking details in a more centralized way. However, in order to > reuse code between the existing {{KafkaConsumer}} and the new > {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the > {{NetworkClientDelegate}} capitulated and -stole- copied three methods from > {{ConsumerNetworkClient}} related to detecting node status: > # {{isUnavailable}} > # {{maybeThrowAuthFailure}} > # {{tryConnect}} > Unfortunately, these have found their way into the {{FetchRequestManager}} > and {{OffsetsRequestManager}} implementations. We should review if we can > clean up—or even remove—this leaky abstraction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
agavra commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1399939069 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedInternal.java: ## @@ -18,16 +18,30 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.state.DslStoreSuppliers; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import java.util.Map; public class StreamJoinedInternal extends StreamJoined { +private final InternalStreamsBuilder builder; + //Needs to be public for testing -public StreamJoinedInternal(final StreamJoined streamJoined) { +public StreamJoinedInternal( +final StreamJoined streamJoined, +final InternalStreamsBuilder builder +) { super(streamJoined); +this.builder = builder; +if (dslStoreSuppliers == null) { +final TopologyConfig topologyConfig = builder.internalTopologyBuilder.topologyConfigs(); +if (topologyConfig != null) { +dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null); +} Review Comment: nice catch! added a test and fixed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
artemlivshits commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1399872558 ## clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json: ## @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 74, + "type": "response", + "name": "DescribeTopicPartitionsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, +{ "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", "versions": "0+", + "about": "Each topic in the response.", "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", +"about": "The topic error, or 0 if there was no error." }, + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "0+", +"about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The topic id." }, + { "name": "IsInternal", "type": "bool", "versions": "0+", "default": "false", "ignorable": true, +"about": "True if the topic is internal." }, + { "name": "Partitions", "type": "[]DescribeTopicPartitionsResponsePartition", "versions": "0+", +"about": "Each partition in the topic.", "fields": [ +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The partition error, or 0 if there was no error." }, +{ "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, +{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the leader broker." }, +{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true, + "about": "The leader epoch of this partition." }, +{ "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId", + "about": "The set of all nodes that host this partition." }, +{ "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId", + "about": "The set of nodes that are in sync with the leader for this partition." }, +{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId", + "versions": "0+", "nullableVersions": "0+", + "about": "The new eligible leader replicas otherwise." }, +{ "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId", + "versions": "0+", "nullableVersions": "0+", + "about": "The last known ELR." }, +{ "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", "ignorable": true, "entityType": "brokerId", + "about": "The set of offline replicas of this partition." }]}, + { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", +"about": "32-bit bitfield to represent authorized operations for this topic." }] +}, +{ "name": "NextTopicPartition", "type": "Cursor", "versions": "0+", "nullableVersions": "0+", "default": "null", Review Comment: If we call it Cursor in the request, I think we should call it NextCursor here. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1356,6 +1373,78 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { +metadataCache match { + case _: ZkMetadataCache => +throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request") + case _ => +} +val kRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache] + +val describeTopicPartitionsRequest = request.body[DescribeTopicPartitionsRequest].data() +val topicSet = mutable.SortedSet
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
agavra commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1399930568 ## streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java: ## @@ -91,11 +91,13 @@ private KeyValueStore maybeWrapLogging(final KeyValueStore, Bytes, byte[]> Review Comment: yes it was, we needed it in order to pass the tests that I added which check that `InMemoryXStore` is in the store heirarchy - otherwise it would stop at this change ## streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java: ## @@ -91,11 +91,13 @@ private KeyValueStore maybeWrapLogging(final KeyValueStore, Bytes, byte[]> Review Comment: yes it was, we needed it in order to pass the tests that I added which check that `InMemoryXStore` is in the store heirarchy - otherwise it would stop at this level -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]
mjsax commented on PR #14632: URL: https://github.com/apache/kafka/pull/14632#issuecomment-1820056764 Seems there is a compilation error on the last run: ```ask :core:compileScala [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14632/core/src/main/scala/kafka/server/DynamicConfig.scala:116:47: object ClientMetricsConfigs is not a member of package kafka.metrics [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14632/core/src/main/scala/kafka/server/DynamicConfig.scala:116:17: private val clientConfigs in object ClientMetrics is never used ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15862) Remove SecurityManager Support
Greg Harris created KAFKA-15862: --- Summary: Remove SecurityManager Support Key: KAFKA-15862 URL: https://issues.apache.org/jira/browse/KAFKA-15862 Project: Kafka Issue Type: New Feature Components: clients, KafkaConnect, Tiered-Storage Reporter: Greg Harris Assignee: Greg Harris https://cwiki.apache.org/confluence/display/KAFKA/KIP-1006%3A+Remove+SecurityManager+Support -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15362: Resolve offline replicas in metadata cache [kafka]
cmccabe commented on PR #14737: URL: https://github.com/apache/kafka/pull/14737#issuecomment-1820033474 @soarez : This PR is causing failures in `KRaftClusterTest.testCreateClusterAndPerformReassignment` ``` Gradle Test Run :core:test > Gradle Test Executor 2 > KRaftClusterTest > testCreateClusterAndPerformReassignment() FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment(KRaftClusterTest.scala:479) Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request.``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15861) In Kraft mode, "ssl.keystore.key" private keys are accesible to all the controllers and brokers
Jesús Cea created KAFKA-15861: - Summary: In Kraft mode, "ssl.keystore.key" private keys are accesible to all the controllers and brokers Key: KAFKA-15861 URL: https://issues.apache.org/jira/browse/KAFKA-15861 Project: Kafka Issue Type: Improvement Affects Versions: 3.6.0 Reporter: Jesús Cea Kafka allow dynamic updates of the TLS keys using "{color:#00}ssl.keystore.key" and "{color:#00}ssl.keystore.certificate.chain{color}". In a KRaft cluster, that data is distributed to the entire cluster, so the private keys of the X509 certificates are widely shared.{color} To test this, you could propagate a X.509 certificate update via "kafka-configs" for a particular server and then use "{color:#00}kafka-metadata-shell.sh" to verify that the new certificate is openly shared with all the cluster servers (controllers and brokers) (under "{color:#00}image/configs/BROKER:X/listener.name.X.ssl.keystore.key{color}"){color} You can also verify this doing a "strings" to the "__cluster_metadata-0" topic log files and "grep" the PEM private key. Expected result: I understand the need of the replicated metadata in KRaft mode, but the X.509 private key should be shared encrypted with "password.encoder.secret", so only the relevant broker is able to decrypt the certificate private key, although all the cluster has access to the "opaque" encrypted data. If each broker has a (different) high quality "password.encoder.secret", the encrypted private key should be safe to replicate. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Always send cumulative failed dirs in HB request [kafka]
cmccabe merged PR #14770: URL: https://github.com/apache/kafka/pull/14770 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15860) ControllerRegistration must be written out to the metadata image
Colin McCabe created KAFKA-15860: Summary: ControllerRegistration must be written out to the metadata image Key: KAFKA-15860 URL: https://issues.apache.org/jira/browse/KAFKA-15860 Project: Kafka Issue Type: Bug Affects Versions: 3.7.0 Reporter: Colin McCabe Assignee: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1399714279 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -16,31 +16,421 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * Handles client telemetry metrics requests/responses, subscriptions and instance information. */ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); -private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; + +private final Cache clientInstanceCache; +private final Map subscriptionMap; +private final KafkaConfig config; +private final Time time; + +// The latest subscription version is used to determine if subscription has changed and needs +// to re-evaluate the client instance subscription id as per changed subscriptions. +private final AtomicInteger subscriptionUpdateVersion; -public static ClientMetricsManager instance() { -return INSTANCE; +public ClientMetricsManager(KafkaConfig config, Time time) { +this.subscriptionMap = new ConcurrentHashMap<>(); +this.subscriptionUpdateVersion = new AtomicInteger(0); +this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +this.config = config; +this.time = time; } public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// Validate the subscription properties. +ClientMetricsConfigs.validate(subscriptionName, properties); +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +this.subscriptionUpdateVersion.incrementAndGet(); +} +return; +} + +updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest req
Re: [PR] MINOR: Mark ConsumerGroupHeartbeat API (v1), OffsetCommit API (v9) and OffsetFetch API (v9) as stable (KIP-848) [kafka]
jolshan commented on PR #14801: URL: https://github.com/apache/kafka/pull/14801#issuecomment-1819987867 https://github.com/apache/kafka/assets/25566826/3f0c8239-2002-4d66-8455-41f299419738";> Somehow the tests are fixed and existing failures for java 21🤔 (Again, don't think it is your change, but something probably should be done with the CI output as we discussed offline today) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Mark ConsumerGroupHeartbeat API (v1), OffsetCommit API (v9) and OffsetFetch API (v9) as stable (KIP-848) [kafka]
jolshan commented on PR #14801: URL: https://github.com/apache/kafka/pull/14801#issuecomment-1819985912 The failures here are out of control. I don't think it is related to your change. I will rebuild. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
artemlivshits commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1399798771 ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -466,6 +466,10 @@ private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) { partition.lastKnownElr[0] != partition.leader)) { // Only update the last known leader when the first time the partition becomes leaderless. record.setLastKnownELR(Arrays.asList(partition.leader)); +} else if ((recordHasLeader(record) || partition.leader != NO_LEADER) Review Comment: Can we have a case when `record.leader() == NO_LEADER && partition.leader != NO_LEADER` (i.e. partion has a valid leader but will not have a valid leader)? Would a more proper condition be `(recordHasLeader(record) || (partition.leader != NO_LEADER && record.leader() == NO_LEADER_CHANGE))` ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1772,7 +1814,8 @@ void generateLeaderAndIsrUpdates(String context, // where there is an unclean leader election which chooses a leader from outside // the ISR. IntPredicate isAcceptableLeader = -r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r)); +r -> (r != brokerToRemove && r != brokerWithUncleanShutdown) Review Comment: Can we update the comments to reflect the new logic? ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; Review Comment: It looks like that in order to work correctly, we need to make sure that ISR and ELR are always disjoint sets (otherwise we'd generate multiple records for the same partition). Can we add a comment about this? ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.apache.kafka.metadata.Replicas.NONE; + +public class BrokersToElrs { +private final SnapshotRegistry snapshotRegistry; + +private final TimelineHashMap> elrMembers; Review Comment: Can we add a comment what's mapped to what here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] KAFKA-8677: Flaky test testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl [kafka]
anatasiavela closed pull request #7267: [WIP] KAFKA-8677: Flaky test testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl URL: https://github.com/apache/kafka/pull/7267 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1399822846 ## streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java: ## @@ -91,11 +91,13 @@ private KeyValueStore maybeWrapLogging(final KeyValueStore, Bytes, byte[]> Review Comment: was this related to the changes in this PR? if not, let's leave it out for now, especially as we'd probably want to clean it up a bit further and change the `wrapped` invocations to `super` throughout the class but feel free to do a followup PR with this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1399821160 ## streams/src/main/java/org/apache/kafka/streams/state/DslStoreSuppliers.java: ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import java.util.Map; +import org.apache.kafka.common.Configurable; + +/** + * {@code DslStoreSuppliers} defines a grouping of factories to construct + * stores for each of the types of state store implementations in Kafka + * Streams. This allows configuration of a default store supplier beyond + * the builtin defaults of RocksDB and In-Memory. Review Comment: This looks great, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Increase the delta for a flaky quota test [kafka]
harinirajendran opened a new pull request, #14806: URL: https://github.com/apache/kafka/pull/14806 Flaky test failing with assertion error just outside the defined threshold. So, bumping up the delta for quota by 1. java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 37.50937734433608 (600 connections / 15.996 sec) ==> expected: <30.0> but was: <37.50937734433608> Fixed in `trunk` via https://github.com/apache/kafka/pull/14805 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1399820405 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedInternal.java: ## @@ -18,16 +18,30 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.state.DslStoreSuppliers; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import java.util.Map; public class StreamJoinedInternal extends StreamJoined { +private final InternalStreamsBuilder builder; + //Needs to be public for testing -public StreamJoinedInternal(final StreamJoined streamJoined) { +public StreamJoinedInternal( +final StreamJoined streamJoined, +final InternalStreamsBuilder builder +) { super(streamJoined); +this.builder = builder; +if (dslStoreSuppliers == null) { +final TopologyConfig topologyConfig = builder.internalTopologyBuilder.topologyConfigs(); +if (topologyConfig != null) { +dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null); +} Review Comment: Hm...is this logic correct in the context of the initialization logic in [OuterStreamJoinedStoreFactory](https://github.com/apache/kafka/pull/14648/files#r1399812871) My read of that code, specifically case 1, was that we interpret `streamJoined#dslStoreSuppliers` as what the user passed in to the StreamJoined, so it should be null (and stay null) if the user didn't explicitly configure a dslStoreSuppliers. Then in case 3 (or the else branch of case 2), we fall back to the config value if neither the `dslStoreSuppliers` or the `thisStoreSupplier` of the StreamJoined was set/non-null ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedInternal.java: ## @@ -18,16 +18,30 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.state.DslStoreSuppliers; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import java.util.Map; public class StreamJoinedInternal extends StreamJoined { +private final InternalStreamsBuilder builder; + //Needs to be public for testing -public StreamJoinedInternal(final StreamJoined streamJoined) { +public StreamJoinedInternal( +final StreamJoined streamJoined, +final InternalStreamsBuilder builder +) { super(streamJoined); +this.builder = builder; +if (dslStoreSuppliers == null) { +final TopologyConfig topologyConfig = builder.internalTopologyBuilder.topologyConfigs(); +if (topologyConfig != null) { +dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null); +} Review Comment: Hm...is this logic correct in the context of the initialization logic in [OuterStreamJoinedStoreFactory](https://github.com/apache/kafka/pull/14648/files#r1399812871)? My read of that code, specifically case 1, was that we interpret `streamJoined#dslStoreSuppliers` as what the user passed in to the StreamJoined, so it should be null (and stay null) if the user didn't explicitly configure a dslStoreSuppliers. Then in case 3 (or the else branch of case 2), we fall back to the config value if neither the `dslStoreSuppliers` or the `thisStoreSupplier` of the StreamJoined was set/non-null -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1399812871 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; +import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; + +import java.time.Duration; +import java.util.Map; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.state.DslKeyValueParams; +import org.apache.kafka.streams.state.DslStoreSuppliers; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.LeftOrRightValue; +import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde; +import org.apache.kafka.streams.state.internals.ListValueStoreBuilder; +import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde; + +public class OuterStreamJoinStoreFactory extends AbstractConfigurableStoreFactory { + +private final String name; +private final StreamJoinedInternal streamJoined; +private final JoinWindows windows; +private final DslStoreSuppliers streamJoinedDslStoreSuppliers; + +private boolean loggingEnabled; + +public enum Type { +RIGHT, +LEFT +} + +public OuterStreamJoinStoreFactory( +final String name, +final StreamJoinedInternal streamJoined, +final JoinWindows windows, +final Type type +) { +super(streamJoined.dslStoreSuppliers()); + +// we store this one manually instead of relying on super#dslStoreSuppliers() +// so that we can differentiate between one that was explicitly passed in and +// one that was configured via super#configure() +this.streamJoinedDslStoreSuppliers = streamJoined.dslStoreSuppliers(); +this.name = buildOuterJoinWindowStoreName(streamJoined, name, type) + "-store"; +this.streamJoined = streamJoined; +this.windows = windows; +this.loggingEnabled = streamJoined.loggingEnabled(); +} + +@Override +public StateStore build() { +final Duration retentionPeriod = Duration.ofMillis(retentionPeriod()); +final Duration windowSize = Duration.ofMillis(windows.size()); +final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); +final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); +final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize"); +final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix); + +if (retentionMs < 0L) { +throw new IllegalArgumentException("retentionPeriod cannot be negative"); +} +if (windowSizeMs < 0L) { +throw new IllegalArgumentException("windowSize cannot be negative"); +} +if (windowSizeMs > retentionMs) { +throw new IllegalArgumentException("The retention period of the window store " ++ name + " must be no smaller than its window size. Got size=[" ++ windowSizeMs + "], retention=[" + retentionMs + "]"); +} + +final TimestampedKeyAndJoinSideSerde timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde()); +final LeftOrRightValueSerde leftOrR
[PR] MINOR: Increase the delta for a flaky quota test [kafka]
harinirajendran opened a new pull request, #14805: URL: https://github.com/apache/kafka/pull/14805 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on PR #14648: URL: https://github.com/apache/kafka/pull/14648#issuecomment-1819878453 Haha yeah it's certainly grown quite a lot over the past few updates -- but for that reason I'm actually fine with keeping it in one PR. The first time I reviewed it there was significantly fewer LOC and I've just been looking at the updates since then. So no worries (If you just threw this PR at me right out of the gate, that'd be a different story lol) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1399779288 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1632,10 +1729,65 @@ public void shouldHandleRangeQuery( IllegalArgumentException.class, queryResult.get(partition)::getFailureMessage ); - try (final KeyValueIterator iterator = queryResult.get(partition).getResult()) { while (iterator.hasNext()) { - actualValues.add(valueExtactor.apply(iterator.next().value)); +actualValues.add((Integer) iterator.next().value); +} +} +assertThat(queryResult.get(partition).getExecutionInfo(), is(empty())); +} +assertThat("Result:" + result, actualValues, is(expectedValues)); +assertThat("Result:" + result, result.getPosition(), is(INPUT_POSITION)); +} +} + +public void shouldHandleTimestampedRangeQuery( +final Optional lower, +final Optional upper, +final boolean isKeyAscending, +final List expectedValues) { Review Comment: Cf my comment above -- you need to set the right timestamps for each result. But it should be deterministic because the used ts are set by the test when input data is written. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1399777340 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1584,7 +1646,43 @@ public void shouldHandleKeyQuery( ); final V result1 = queryResult.getResult(); -final Integer integer = valueExtactor.apply(result1); +final Integer integer = (Integer) result1; +assertThat(integer, is(expectedValue)); +assertThat(queryResult.getExecutionInfo(), is(empty())); +assertThat(queryResult.getPosition(), is(POSITION_0)); +} + +public void shouldHandleTimestampedKeyQuery( +final Integer key, +final Integer expectedValue) { Review Comment: We should compare both (what happens automatically if you compare `ValueAndTimestamp` type). Thus, you need to set the right timestamp on expected `ValueAndTimestamp`. The ts comes from the data, and is set in `before()` to `WINDOW_START + Duration.ofMinutes(2).toMillis() * i` when input data is written. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15801: WIP Add hostname and port in NetworkClient logging and increase connection issues logging severity. [kafka]
GianlucaPrincipini commented on PR #14804: URL: https://github.com/apache/kafka/pull/14804#issuecomment-1819828622 Any suggestions to retrieve port in the best possible way? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [KAFKA-15801][WIP] Add hostname and port in netword NetworkClient logging and increase connection issues severity [kafka]
GianlucaPrincipini opened a new pull request, #14804: URL: https://github.com/apache/kafka/pull/14804 ## Description When a component of the Kafka broker tries to reach another broker within the cluster the logging should be more elaborate and include the IP/hostname and port it tries to connect to, and have a higher severity WARN rather than INFO. Furthermore, the hostname has been added to all logging in NetworkClient. ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]
lihaosky commented on code in PR #14605: URL: https://github.com/apache/kafka/pull/14605#discussion_r1399755568 ## streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockFixedKeyProcessorContextTest.java: ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.test; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory; +import org.apache.kafka.streams.processor.api.MockFixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +public class MockFixedKeyProcessorContextTest { Review Comment: Can you parameterize `MockProcessorContextAPITest`? Most of the code here seems to be the same as what's in `MockProcessorContextAPITest`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Update Streams API broker compat table [kafka]
JimGalasyn opened a new pull request, #14803: URL: https://github.com/apache/kafka/pull/14803 Update the table for v3.6. Also needs to be cherry-picked to the 3.6 branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
xvrl commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1399702579 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value, but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 metrics: a rate and a count. For eg: + * org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it has to be + * created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. + * + * + * + * Frequencies is created with an array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a GAUGE_DOUBLE. + * + * + * + * Percentiles work the same way as Frequencies. The only difference is that it is compose
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1399670915 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String, futureLog.updateHighWatermark(partitionData.highWatermark) futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) -if (partition.maybeReplaceCurrentWithFutureReplica()) - removePartitions(Set(topicPartition)) +directoryEventHandler match { + case DirectoryEventHandler.NOOP => +if (partition.maybeReplaceCurrentWithFutureReplica()) + removePartitions(Set(topicPartition)) + case _ => +maybePromoteFutureReplica(topicPartition, partition) +} quota.record(records.sizeInBytes) logAppendInfo } + // Visible for testing + def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: DirectoryEventRequestState): Unit = { +assignmentRequestStates.put(topicPartition, state) + } + private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { +val partitionRequestState = Option(assignmentRequestStates.get(topicPartition)) +val topicId = partition.topicId +if (topicId.isEmpty) + throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists but its ID doesn't exist.") Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1399669761 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String, futureLog.updateHighWatermark(partitionData.highWatermark) futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) -if (partition.maybeReplaceCurrentWithFutureReplica()) - removePartitions(Set(topicPartition)) +directoryEventHandler match { + case DirectoryEventHandler.NOOP => +if (partition.maybeReplaceCurrentWithFutureReplica()) + removePartitions(Set(topicPartition)) + case _ => +maybePromoteFutureReplica(topicPartition, partition) +} quota.record(records.sizeInBytes) logAppendInfo } + // Visible for testing + def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: DirectoryEventRequestState): Unit = { +assignmentRequestStates.put(topicPartition, state) + } + private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { +val partitionRequestState = Option(assignmentRequestStates.get(topicPartition)) +val topicId = partition.topicId +if (topicId.isEmpty) + throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists but its ID doesn't exist.") + +partitionRequestState match { + case None => +// Schedule assignment request and don't promote the future replica yet until the controller accepted the request. +partition.maybeFutureReplicaCaughtUp(_ => { + partition.futureReplicaDirectoryId() +.map { + directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), _, +updatedAssignmentRequestStat(topicPartition)(_)) +} +}) + case Some(DirectoryEventRequestState.COMPLETED) => +// Promote future replica if controller accepted the request and the replica caught-up with the original log. +if (partition.maybeReplaceCurrentWithFutureReplica()) { + removePartitions(Set(topicPartition)) + assignmentRequestStates.remove(topicPartition) +} + case _ => +log.info("Waiting for AssignmentRequest to succeed before promoting the future replica.") Review Comment: changed the log level to trace -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1399667766 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String, futureLog.updateHighWatermark(partitionData.highWatermark) futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) -if (partition.maybeReplaceCurrentWithFutureReplica()) - removePartitions(Set(topicPartition)) +directoryEventHandler match { + case DirectoryEventHandler.NOOP => +if (partition.maybeReplaceCurrentWithFutureReplica()) + removePartitions(Set(topicPartition)) + case _ => +maybePromoteFutureReplica(topicPartition, partition) +} quota.record(records.sizeInBytes) logAppendInfo } + // Visible for testing + def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: DirectoryEventRequestState): Unit = { +assignmentRequestStates.put(topicPartition, state) + } + private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { +val partitionRequestState = Option(assignmentRequestStates.get(topicPartition)) +val topicId = partition.topicId +if (topicId.isEmpty) + throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists but its ID doesn't exist.") + +partitionRequestState match { + case None => +// Schedule assignment request and don't promote the future replica yet until the controller accepted the request. +partition.maybeFutureReplicaCaughtUp(_ => { + partition.futureReplicaDirectoryId() +.map { + directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), _, +updatedAssignmentRequestStat(topicPartition)(_)) Review Comment: on second thought, I changed the signature and structure of these callbacks and did some refactoring. Now AssignmentManager just runs the callback as runnable when the assignment is completed without explicitly sending the state `COMPLETE` to the callback method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15816: Fix leaked sockets in streams tests [kafka]
gharris1727 commented on code in PR #14769: URL: https://github.com/apache/kafka/pull/14769#discussion_r1399656534 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java: ## @@ -418,6 +418,8 @@ public String metricsScope() { }) ); +// do not use the harness streams Review Comment: I changed this, let me know if the new comment makes more sense or if you'd like it removed. I was indicating that the KafkaStreams that the test harness set up for the test won't be used, and a different KafkaStreams was being substituted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1399656268 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1632,10 +1729,65 @@ public void shouldHandleRangeQuery( IllegalArgumentException.class, queryResult.get(partition)::getFailureMessage ); - try (final KeyValueIterator iterator = queryResult.get(partition).getResult()) { while (iterator.hasNext()) { - actualValues.add(valueExtactor.apply(iterator.next().value)); +actualValues.add((Integer) iterator.next().value); +} +} +assertThat(queryResult.get(partition).getExecutionInfo(), is(empty())); +} +assertThat("Result:" + result, actualValues, is(expectedValues)); +assertThat("Result:" + result, result.getPosition(), is(INPUT_POSITION)); +} +} + +public void shouldHandleTimestampedRangeQuery( +final Optional lower, +final Optional upper, +final boolean isKeyAscending, +final List expectedValues) { Review Comment: It seem every time the timestamp of ` ValueAndTimestamp` is change, so we cannot compare ` ValueAndTimestamp`. ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1632,10 +1729,65 @@ public void shouldHandleRangeQuery( IllegalArgumentException.class, queryResult.get(partition)::getFailureMessage ); - try (final KeyValueIterator iterator = queryResult.get(partition).getResult()) { while (iterator.hasNext()) { - actualValues.add(valueExtactor.apply(iterator.next().value)); +actualValues.add((Integer) iterator.next().value); +} +} +assertThat(queryResult.get(partition).getExecutionInfo(), is(empty())); +} +assertThat("Result:" + result, actualValues, is(expectedValues)); +assertThat("Result:" + result, result.getPosition(), is(INPUT_POSITION)); +} +} + +public void shouldHandleTimestampedRangeQuery( +final Optional lower, +final Optional upper, +final boolean isKeyAscending, +final List expectedValues) { Review Comment: It seems every time the timestamp of ` ValueAndTimestamp` is change, so we cannot compare ` ValueAndTimestamp`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1399652671 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1584,7 +1646,43 @@ public void shouldHandleKeyQuery( ); final V result1 = queryResult.getResult(); -final Integer integer = valueExtactor.apply(result1); +final Integer integer = (Integer) result1; +assertThat(integer, is(expectedValue)); +assertThat(queryResult.getExecutionInfo(), is(empty())); +assertThat(queryResult.getPosition(), is(POSITION_0)); +} + +public void shouldHandleTimestampedKeyQuery( +final Integer key, +final Integer expectedValue) { + +final TimestampedKeyQuery query = TimestampedKeyQuery.withKey(key); +final StateQueryRequest> request = +inStore(STORE_NAME) +.withQuery(query) +.withPartitions(mkSet(0, 1)) +.withPositionBound(PositionBound.at(INPUT_POSITION)); + +final StateQueryResult> result = +IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); +final QueryResult> queryResult = result.getOnlyPartitionResult(); +if (queryResult == null) { +throw new AssertionError("cannot use this query type to query result"); +} +final boolean failure = queryResult.isFailure(); +if (failure) { +throw new AssertionError(queryResult.toString()); +} +assertThat(queryResult.isSuccess(), is(true)); + +assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); +assertThrows( +IllegalArgumentException.class, +queryResult::getFailureMessage +); + +final ValueAndTimestamp result1 = queryResult.getResult(); Review Comment: ok ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1584,7 +1646,43 @@ public void shouldHandleKeyQuery( ); final V result1 = queryResult.getResult(); -final Integer integer = valueExtactor.apply(result1); +final Integer integer = (Integer) result1; +assertThat(integer, is(expectedValue)); +assertThat(queryResult.getExecutionInfo(), is(empty())); +assertThat(queryResult.getPosition(), is(POSITION_0)); +} + +public void shouldHandleTimestampedKeyQuery( +final Integer key, +final Integer expectedValue) { + +final TimestampedKeyQuery query = TimestampedKeyQuery.withKey(key); +final StateQueryRequest> request = +inStore(STORE_NAME) +.withQuery(query) +.withPartitions(mkSet(0, 1)) +.withPositionBound(PositionBound.at(INPUT_POSITION)); + +final StateQueryResult> result = +IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); +final QueryResult> queryResult = result.getOnlyPartitionResult(); +if (queryResult == null) { +throw new AssertionError("cannot use this query type to query result"); +} +final boolean failure = queryResult.isFailure(); +if (failure) { +throw new AssertionError(queryResult.toString()); +} +assertThat(queryResult.isSuccess(), is(true)); + +assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); +assertThrows( +IllegalArgumentException.class, +queryResult::getFailureMessage +); + +final ValueAndTimestamp result1 = queryResult.getResult(); +final Integer integer = (Integer) result1.value(); assertThat(integer, is(expectedValue)); Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1399632055 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1584,7 +1646,43 @@ public void shouldHandleKeyQuery( ); final V result1 = queryResult.getResult(); -final Integer integer = valueExtactor.apply(result1); +final Integer integer = (Integer) result1; +assertThat(integer, is(expectedValue)); +assertThat(queryResult.getExecutionInfo(), is(empty())); +assertThat(queryResult.getPosition(), is(POSITION_0)); +} + +public void shouldHandleTimestampedKeyQuery( +final Integer key, +final Integer expectedValue) { Review Comment: But in the end we should compare the value in `ValueAndTimestamp`? so we can set the the timestamp of `ValueAndTimestamp` -1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1399632055 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1584,7 +1646,43 @@ public void shouldHandleKeyQuery( ); final V result1 = queryResult.getResult(); -final Integer integer = valueExtactor.apply(result1); +final Integer integer = (Integer) result1; +assertThat(integer, is(expectedValue)); +assertThat(queryResult.getExecutionInfo(), is(empty())); +assertThat(queryResult.getPosition(), is(POSITION_0)); +} + +public void shouldHandleTimestampedKeyQuery( +final Integer key, +final Integer expectedValue) { Review Comment: But in the end we should compare the value in `ValueAndTimestamp`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1399627999 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String, futureLog.updateHighWatermark(partitionData.highWatermark) futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) -if (partition.maybeReplaceCurrentWithFutureReplica()) - removePartitions(Set(topicPartition)) +directoryEventHandler match { + case DirectoryEventHandler.NOOP => +if (partition.maybeReplaceCurrentWithFutureReplica()) + removePartitions(Set(topicPartition)) + case _ => +maybePromoteFutureReplica(topicPartition, partition) +} quota.record(records.sizeInBytes) logAppendInfo } + // Visible for testing + def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: DirectoryEventRequestState): Unit = { +assignmentRequestStates.put(topicPartition, state) + } + private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { +val partitionRequestState = Option(assignmentRequestStates.get(topicPartition)) +val topicId = partition.topicId +if (topicId.isEmpty) + throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists but its ID doesn't exist.") + +partitionRequestState match { + case None => +// Schedule assignment request and don't promote the future replica yet until the controller accepted the request. +partition.maybeFutureReplicaCaughtUp(_ => { + partition.futureReplicaDirectoryId() +.map { + directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), _, +updatedAssignmentRequestStat(topicPartition)(_)) Review Comment: updated this, now `AssignmentManager` will only update the state to `COMPLETED` and `QUEUED` will be set immediately here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1399627137 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String, futureLog.updateHighWatermark(partitionData.highWatermark) futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) -if (partition.maybeReplaceCurrentWithFutureReplica()) - removePartitions(Set(topicPartition)) +directoryEventHandler match { + case DirectoryEventHandler.NOOP => +if (partition.maybeReplaceCurrentWithFutureReplica()) + removePartitions(Set(topicPartition)) + case _ => +maybePromoteFutureReplica(topicPartition, partition) +} quota.record(records.sizeInBytes) logAppendInfo } + // Visible for testing + def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: DirectoryEventRequestState): Unit = { +assignmentRequestStates.put(topicPartition, state) + } + private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { +val partitionRequestState = Option(assignmentRequestStates.get(topicPartition)) +val topicId = partition.topicId +if (topicId.isEmpty) + throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists but its ID doesn't exist.") + +partitionRequestState match { + case None => +// Schedule assignment request and don't promote the future replica yet until the controller accepted the request. +partition.maybeFutureReplicaCaughtUp(_ => { + partition.futureReplicaDirectoryId() +.map { + directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), _, +updatedAssignmentRequestStat(topicPartition)(_)) +} +}) + case Some(DirectoryEventRequestState.COMPLETED) => +// Promote future replica if controller accepted the request and the replica caught-up with the original log. +if (partition.maybeReplaceCurrentWithFutureReplica()) { + removePartitions(Set(topicPartition)) + assignmentRequestStates.remove(topicPartition) +} + case _ => Review Comment: I updated the pr to only deal with `QUEUED` and `COMPLETED`. And dropped `DISPATCHED` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1399617212 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1584,7 +1646,43 @@ public void shouldHandleKeyQuery( ); final V result1 = queryResult.getResult(); -final Integer integer = valueExtactor.apply(result1); +final Integer integer = (Integer) result1; +assertThat(integer, is(expectedValue)); +assertThat(queryResult.getExecutionInfo(), is(empty())); +assertThat(queryResult.getPosition(), is(POSITION_0)); +} + +public void shouldHandleTimestampedKeyQuery( +final Integer key, +final Integer expectedValue) { Review Comment: I think we should pass in `ValueAndTimestamp` as expected result and also verify that we get the correct timestamp back. (Also rename `expectedValue` to `expectedValueAndTimestamp`.) ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1584,7 +1646,43 @@ public void shouldHandleKeyQuery( ); final V result1 = queryResult.getResult(); -final Integer integer = valueExtactor.apply(result1); +final Integer integer = (Integer) result1; +assertThat(integer, is(expectedValue)); +assertThat(queryResult.getExecutionInfo(), is(empty())); +assertThat(queryResult.getPosition(), is(POSITION_0)); +} + +public void shouldHandleTimestampedKeyQuery( +final Integer key, +final Integer expectedValue) { + +final TimestampedKeyQuery query = TimestampedKeyQuery.withKey(key); +final StateQueryRequest> request = +inStore(STORE_NAME) +.withQuery(query) +.withPartitions(mkSet(0, 1)) +.withPositionBound(PositionBound.at(INPUT_POSITION)); + +final StateQueryResult> result = +IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); +final QueryResult> queryResult = result.getOnlyPartitionResult(); +if (queryResult == null) { +throw new AssertionError("cannot use this query type to query result"); +} +final boolean failure = queryResult.isFailure(); +if (failure) { +throw new AssertionError(queryResult.toString()); +} +assertThat(queryResult.isSuccess(), is(true)); + +assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); +assertThrows( +IllegalArgumentException.class, +queryResult::getFailureMessage +); + +final ValueAndTimestamp result1 = queryResult.getResult(); Review Comment: `result1` is a very bad name. Rename to `valueAndTimestamp`. ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1584,7 +1646,43 @@ public void shouldHandleKeyQuery( ); final V result1 = queryResult.getResult(); -final Integer integer = valueExtactor.apply(result1); +final Integer integer = (Integer) result1; +assertThat(integer, is(expectedValue)); +assertThat(queryResult.getExecutionInfo(), is(empty())); +assertThat(queryResult.getPosition(), is(POSITION_0)); +} + +public void shouldHandleTimestampedKeyQuery( +final Integer key, +final Integer expectedValue) { + +final TimestampedKeyQuery query = TimestampedKeyQuery.withKey(key); +final StateQueryRequest> request = +inStore(STORE_NAME) +.withQuery(query) +.withPartitions(mkSet(0, 1)) +.withPositionBound(PositionBound.at(INPUT_POSITION)); + +final StateQueryResult> result = +IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); +final QueryResult> queryResult = result.getOnlyPartitionResult(); +if (queryResult == null) { +throw new AssertionError("cannot use this query type to query result"); +} +final boolean failure = queryResult.isFailure(); +if (failure) { +throw new AssertionError(queryResult.toString()); +} +assertThat(queryResult.isSuccess(), is(true)); + +assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); +assertThrows( +IllegalArgumentException.class, +queryResult::getFailureMessage +); + +final ValueAndTimestamp result1 = queryResult.getResult(); +final Integer integer = (Integer) result1.value(); assertThat(integer, is(expectedValue)); Review Comment: Cf my comments above: I think this can just ` assertThat(valueAndTimestamp, is(expectedValueAndTimestamp));` ###
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1399605671 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String, futureLog.updateHighWatermark(partitionData.highWatermark) futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) -if (partition.maybeReplaceCurrentWithFutureReplica()) - removePartitions(Set(topicPartition)) +directoryEventHandler match { + case DirectoryEventHandler.NOOP => +if (partition.maybeReplaceCurrentWithFutureReplica()) + removePartitions(Set(topicPartition)) + case _ => +maybePromoteFutureReplica(topicPartition, partition) +} Review Comment: Which callback you are referring to here? The order of checking partition future replica is different between DirectoryEventHandler.NOOP and when we have a handler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Small LogValidator clean ups [kafka]
hachikuji merged PR #14697: URL: https://github.com/apache/kafka/pull/14697 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Small LogValidator clean ups [kafka]
hachikuji commented on PR #14697: URL: https://github.com/apache/kafka/pull/14697#issuecomment-1819609057 @junrao I triggered a few builds and didn't see much consistency in the test failures. I tried some of the tests locally and they passed. I will go ahead and merge to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
lihaosky commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1397782005 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: This is because previously we look at outer store and then join. But this change make it we join first and then look at outer store. The ts in outer store and other store is hard to reason. If we change the ts of 0 to be 100 and ts of 1 to be 50, the original test would still produce 0 first which has larger ts... So unless we compare the ts of join and outer at the same time when we output, we can't guarantee the order of ts when output. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1399579994 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -327,10 +333,12 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); -if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); Review Comment: Actually, the server may receive old registration requests due to poor network conditions or during rolling upgrades. I think we should also treat the old version requests as unclean shutdowns. ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -327,10 +333,12 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); -if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); Review Comment: Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1399575912 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1294,8 +1315,11 @@ void handleBrokerFenced(int brokerId, List records) { */ void handleBrokerUnregistered(int brokerId, long brokerEpoch, List records) { -generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, records, -brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); +generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, NO_LEADER, records, +new PartitionsOnReplicaIteratorChain(Arrays.asList( Review Comment: Removed. ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -466,6 +466,10 @@ private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) { partition.lastKnownElr[0] != partition.leader)) { // Only update the last known leader when the first time the partition becomes leaderless. record.setLastKnownELR(Arrays.asList(partition.leader)); +} else if ((record.leader() != NO_LEADER && record.leader() != NO_LEADER_CHANGE || partition.leader != NO_LEADER) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15022: Detect negative cycle from one source [kafka]
lihaosky commented on code in PR #14696: URL: https://github.com/apache/kafka/pull/14696#discussion_r1399571144 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java: ## @@ -117,6 +131,8 @@ private Graph(final boolean isResidualGraph) { } public void addEdge(final V u, final V v, final int capacity, final int cost, final int flow) { +Objects.requireNonNull(u); +Objects.requireNonNull(v); Review Comment: `null` is a special node we introduced in graph as dummy source to detect negative cycle. So users can use it ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java: ## @@ -103,8 +104,21 @@ public String toString() { } } -private final SortedMap> adjList = new TreeMap<>(); -private final SortedSet nodes = new TreeSet<>(); +private class KeyComparator implements Comparator { + +@Override +public int compare(final V o1, final V o2) { +if (o1 == null || o2 == null) { +return -1; Review Comment: Good point, if both are `null`, we can return 0; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15705: Add integration tests for Heartbeat API and GroupLeave API [kafka]
dongnuo123 commented on code in PR #14656: URL: https://github.com/apache/kafka/pull/14656#discussion_r1399570354 ## core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala: ## @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol +import org.apache.kafka.common.message.SyncGroupRequestData +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +import java.util.Collections +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testHeartbeatWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testHeartbeat() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testHeartbeatWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { +testHeartbeat() + } + + private def testHeartbeat(): Unit = { +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +createOffsetsTopic() + +// Create the topic. +createTopic( + topic = "foo", + numPartitions = 3 +) + +for (version <- ApiKeys.HEARTBEAT.oldestVersion() to ApiKeys.HEARTBEAT.latestVersion(isUnstableApiEnabled)) { + val metadata = ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo")) + ).array + + val (leaderMemberId, leaderEpoch) = joinDynamicConsumerGroupWithOldProtocol( +groupId = "grp", +metadata = metadata, +completeRebalance = false + ) + + // Heartbeat with unknown group id and unknown member id. + heartbeat( +groupId = "grp-unknown", +memberId = "member-id-unknown", +generationId = -1, +expectedError = Errors.UNKNOWN_MEMBER_ID, +version = version.toShort + ) + + // Heartbeat with unknown group id. + heartbeat( +groupId = "grp-unknown", +memberId = leaderMemberId, +generationId = -1, +expectedError = Errors.UNKNOWN_MEMBER_ID, +version = version.toShort + ) + + // Heartbeat with unknown member id. + heartbeat( +groupId = "grp", +memberId = "member-id-unknown", +generationId = -1, +expectedError = Errors.UNKNOWN_MEMBER_ID, +version = version.toShort + ) + + // Heartbeat with unmatched generation id. + heartbeat( +groupId = "grp", +memberId = leaderMemberId, +generationId = -1, +expectedError = Errors.ILLEGAL_GENERATION, +version = version.toShort + ) + + // Heartbeat COMPLETING_REBALANCE group. + heartbeat( +groupId = "grp", +memberId = leaderMemberId, +generationId = leaderEpoch, +version = version.toShort + ) + +
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1399564930 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -210,6 +220,9 @@ public void run() throws Exception { channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder( buildRequestData(brokerId, brokerEpochSupplier.get(), assignment)), new AssignReplicasToDirsRequestCompletionHandler()); +inflight.values().stream() +.filter(assignmentEvent -> assignmentEvent.callback != null) +.forEach(assignmentEvent -> assignmentEvent.callback.accept(DirectoryEventRequestState.DISPATCHED)); Review Comment: For dispatched this is true but for queued I think we still need this so we don't send the reassignment multiple times to AssignmentManager. Especially that as far as I see the manager isn't idempotent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
lihaosky commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1399561823 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -791,7 +791,7 @@ public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], time); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(1, "null+a1", 102L) +new KeyValueTimestamp<>(1, "null+a1", 102L) Review Comment: revert this? ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -878,16 +880,18 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { processor.checkAndClearProcessResult(); -// push one item to the first stream; this should produce one full-join item +// push one item to the first stream; +// this should produce one inner-join item; +// and a right-joined item for a3 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } // w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 201) } -// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101 } +// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) } time += 100; inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], time); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(2, "A2+a2", 201L) +new KeyValueTimestamp<>(2, "A2+a2", 201L) Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1399559787 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -146,6 +153,9 @@ public void run() throws Exception { log.debug("Received new assignment {}", this); } pending.put(partition, this); +if (callback != null) { +callback.accept(DirectoryEventRequestState.QUEUED); +} Review Comment: I think the Dispatched state is the questionable one. We may not need it and we keep the reassignment queued until it it is completed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1399558750 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -146,6 +153,9 @@ public void run() throws Exception { log.debug("Received new assignment {}", this); } pending.put(partition, this); +if (callback != null) { +callback.accept(DirectoryEventRequestState.QUEUED); +} Review Comment: We need to mark the request has been sent to we don't send it again and just wait on the state to be `COMPLETED` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]
junrao commented on PR #14632: URL: https://github.com/apache/kafka/pull/14632#issuecomment-1819538489 @apoorvmittal10 : Thanks for triaging the failed tests. There is still no green build though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org