Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1619996598 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + +@Test +public void testCopyQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(61, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testFetchQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(11, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testCopyQuotaNotExceeded() throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that will be copied +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producer
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1619996273 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + +@Test +public void testCopyQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(61, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testFetchQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(11, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testCopyQuotaNotExceeded() throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that will be copied +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producer
[jira] [Commented] (KAFKA-16863) Consider removing `default.` prefix for exception handler config
[ https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850587#comment-17850587 ] A. Sophie Blee-Goldman commented on KAFKA-16863: If we're going to do a config name cleanup, maybe we can do a pass and fix all the current issues. For example I just happened to notice that two of the new rack-aware assignment configs use underscores in part of their config name, which does not follow the establish pattern and is likely to be messed up easily by users who depend on config files or otherwise configure things without the aid of the java config variable For example "rack.aware.assignment.traffic_cost" should be "rack.aware.assignment.traffic.cost" These are very new configs so maybe the disruption isn't worth it. On the other hand, if we can make this change now, maybe we can fix it before too many people start using the new feature and these underscore-based configs? > Consider removing `default.` prefix for exception handler config > > > Key: KAFKA-16863 > URL: https://issues.apache.org/jira/browse/KAFKA-16863 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Trivial > Labels: need-kip > > Kafka Streams has a set of configs with `default.` prefix. The intent for the > default-prefix is to make a distinction between, well the default, and > in-place overwrites in the code. Eg, users can specify ts-extractors on a > per-topic basis. > However, for the deserialization- and production-exception handlers, no such > overwrites are possible, and thus, `default.` does not really make sense, > because there is just one handler overall. Via KIP-1033 we added a new > processing-exception handler w/o a default-prefix, too. > Thus, we should consider to deprecate the two existing configs names and add > them back w/o the `default.` prefix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Prevent consumer protocol to be used in ZK mode [kafka]
dajac merged PR #16121: URL: https://github.com/apache/kafka/pull/16121 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
ableegoldman commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1619985405 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: Can't say what it's original purpose was but StreamsException has definitely morphed into a catch-all for exceptions throughout Streams. It's definitely not exclusive to the state of a task though (that would be ProcessorStateException). The nice thing about StreamsException is you can add other useful metadata such as the taskId where the error originated, so I always prefer to just throw the StreamsException. We also know for a fact that StreamsException will be caught and handled properly as it gets bubbled up. So I'd go for merging this into a single `catch RuntimeException` block, then wrap it in a StreamsException. And don't forget to add the task id too! đ (you can get it from the processor context) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1619973284 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig { "less than or equal to `log.retention.bytes` value."; public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.copy.max.bytes.per.second"; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be copied from local storage to remote storage per second. " + +"This is a global limit for all the partitions that are being copied from remote storage to local storage. " + +"The default value is Long.MAX_VALUE, which means there is no limit on the number of bytes that can be copied per second."; +public static final Long DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE; + +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP = "remote.log.manager.copy.quota.window.num"; +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC = "The number of samples to retain in memory for remote copy quota management. " + +"The default value is 61, which means there are 60 whole windows + 1 current window."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM = 61; + +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP = "remote.log.manager.copy.quota.window.size.seconds"; +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for remote copy quota management. " + +"The default value is 1 second."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS = 1; + +public static final String REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.fetch.max.bytes.per.second"; +public static final String REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be fetched from remote storage to local storage per second. " + +"This is a global limit for all the partitions that are being fetched from remote storage to local storage. " + +"The default value is Long.MAX_VALUE, which means there is no limit on the number of bytes that can be fetched per second."; +public static final Long DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND = Long.MAX_VALUE; + +public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP = "remote.log.manager.fetch.quota.window.num"; +public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = "The number of samples to retain in memory for remote fetch quota management. " + +"The default value is 11, which means there are 10 whole windows + 1 current window."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM = 11; Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619961360 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -18,29 +18,18 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.lang.Math.min; /** - * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with Review Comment: That was for the subscription type right which is just one word? Homogeneous uniform assignment builder is kind of a mouthful. I used the wrong term, by user I meant anyone looking at the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619961360 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -18,29 +18,18 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.lang.Math.min; /** - * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with Review Comment: That was for the subscription type right? Homogeneous uniform assignment builder is kind of a mouthful. I used the wrong term, by user I meant anyone looking at the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16853) Split RemoteLogManagerScheduledThreadPool
[ https://issues.apache.org/jira/browse/KAFKA-16853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850578#comment-17850578 ] Abhijeet Kumar commented on KAFKA-16853: Thanks [~christo_lolov] . I will work on this. > Split RemoteLogManagerScheduledThreadPool > - > > Key: KAFKA-16853 > URL: https://issues.apache.org/jira/browse/KAFKA-16853 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Abhijeet Kumar >Priority: Major > > *Summary* > To begin with create just the RemoteDataExpirationThreadPool and move > expiration to it. Keep all settings as if the only thread pool was the > RemoteLogManagerScheduledThreadPool. Ensure that the new thread pool is wired > correctly to the RemoteLogManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619937439 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619924951 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { Review Comment: nit: can we add java docs for this method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]
ableegoldman commented on code in PR #16129: URL: https://github.com/apache/kafka/pull/16129#discussion_r1619920514 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final TaskInfo task, } return true; } + +private static Map tagBasedStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { +final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); +final Map tasksToRemainingStandbys = applicationState.allTasks().values().stream() +.collect(Collectors.toMap(TaskInfo::id, taskInfo -> numStandbyReplicas)); +final Map streamStates = applicationState.kafkaStreamsStates(false); + +final Set rackAwareAssignmentTags = new HashSet<>(getRackAwareAssignmentTags(applicationState)); +final TagStatistics tagStatistics = new TagStatistics(applicationState); + +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments); + +final Set statefulTaskIds = applicationState.allTasks().values().stream() +.filter(TaskInfo::isStateful) +.map(TaskInfo::id) +.collect(Collectors.toSet()); +final Map clientsByUuid = kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap( +entry -> entry.getKey().id(), +Map.Entry::getValue +)); +final Function> clientTagGetter = createClientTagGetter(applicationState); + +final Map pendingStandbyTasksToClientId = new HashMap<>(); +for (final TaskId statefulTaskId : statefulTaskIds) { +for (final KafkaStreamsAssignment assignment : clientsByUuid.values()) { +if (assignment.tasks().containsKey(statefulTaskId)) { +assignStandbyTasksToClientsWithDifferentTags( +numStandbyReplicas, +standbyTaskClientsByTaskLoad, +statefulTaskId, +assignment.processId(), +rackAwareAssignmentTags, +streamStates, +kafkaStreamsAssignments, +tasksToRemainingStandbys, +tagStatistics.tagKeyToValues, +tagStatistics.tagEntryToClients, +pendingStandbyTasksToClientId, +clientTagGetter +); +} +} +} + +if (!tasksToRemainingStandbys.isEmpty()) { +assignPendingStandbyTasksToLeastLoadedClients(clientsByUuid, +numStandbyReplicas, +standbyTaskClientsByTaskLoad, +tasksToRemainingStandbys); +} + +return kafkaStreamsAssignments; +} + +private static Map loadBasedStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { +final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); +final Map tasksToRemainingStandbys = applicationState.allTasks().values().stream() +.collect(Collectors.toMap(TaskInfo::id, taskInfo -> numStandbyReplicas)); +final Map streamStates = applicationState.kafkaStreamsStates(false); + +final Set statefulTaskIds = applicationState.allTasks().values().stream() +.filter(TaskInfo::isStateful) +.map(TaskInfo::id) +.collect(Collectors.toSet()); +final Map clients = kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap( +entry -> entry.getKey().id(), +Map.Entry::getValue +)); + +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments); + standbyTaskClientsByTaskLoad.offerAll(streamStates.keySet().stream().map(ProcessId::id).collect(Collectors.toSet())); +for (final TaskId task : statefulTaskIds) { +assignStandbyTasksForActiveTask(numStandbyReplicas, clients, +tasksToRemainingStandbys, standbyTaskClientsByTaskLoad, task); +} +return kafkaStreamsAssignments; +} + +private static void assignStandbyTasksForActiveTask(final int numStandbyReplicas, +final Map clients, +final Map tasksToRemainingStandbys, +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad, +
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619917677 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619915938 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619915938 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619907267 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]
ableegoldman commented on code in PR #16129: URL: https://github.com/apache/kafka/pull/16129#discussion_r1619896882 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final TaskInfo task, } return true; } + +private static Map tagBasedStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { +final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); +final Map tasksToRemainingStandbys = applicationState.allTasks().values().stream() +.collect(Collectors.toMap(TaskInfo::id, taskInfo -> numStandbyReplicas)); +final Map streamStates = applicationState.kafkaStreamsStates(false); + +final Set rackAwareAssignmentTags = new HashSet<>(getRackAwareAssignmentTags(applicationState)); +final TagStatistics tagStatistics = new TagStatistics(applicationState); + +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments); + +final Set statefulTaskIds = applicationState.allTasks().values().stream() +.filter(TaskInfo::isStateful) +.map(TaskInfo::id) +.collect(Collectors.toSet()); +final Map clientsByUuid = kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap( +entry -> entry.getKey().id(), +Map.Entry::getValue +)); +final Function> clientTagGetter = createClientTagGetter(applicationState); + +final Map pendingStandbyTasksToClientId = new HashMap<>(); +for (final TaskId statefulTaskId : statefulTaskIds) { +for (final KafkaStreamsAssignment assignment : clientsByUuid.values()) { +if (assignment.tasks().containsKey(statefulTaskId)) { +assignStandbyTasksToClientsWithDifferentTags( +numStandbyReplicas, +standbyTaskClientsByTaskLoad, +statefulTaskId, +assignment.processId(), +rackAwareAssignmentTags, +streamStates, +kafkaStreamsAssignments, +tasksToRemainingStandbys, +tagStatistics.tagKeyToValues, +tagStatistics.tagEntryToClients, +pendingStandbyTasksToClientId, +clientTagGetter +); +} +} +} + +if (!tasksToRemainingStandbys.isEmpty()) { +assignPendingStandbyTasksToLeastLoadedClients(clientsByUuid, +numStandbyReplicas, +standbyTaskClientsByTaskLoad, +tasksToRemainingStandbys); +} + +return kafkaStreamsAssignments; +} + +private static Map loadBasedStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { +final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); +final Map tasksToRemainingStandbys = applicationState.allTasks().values().stream() +.collect(Collectors.toMap(TaskInfo::id, taskInfo -> numStandbyReplicas)); +final Map streamStates = applicationState.kafkaStreamsStates(false); + +final Set statefulTaskIds = applicationState.allTasks().values().stream() +.filter(TaskInfo::isStateful) +.map(TaskInfo::id) +.collect(Collectors.toSet()); +final Map clients = kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap( +entry -> entry.getKey().id(), +Map.Entry::getValue +)); + +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments); + standbyTaskClientsByTaskLoad.offerAll(streamStates.keySet().stream().map(ProcessId::id).collect(Collectors.toSet())); +for (final TaskId task : statefulTaskIds) { +assignStandbyTasksForActiveTask(numStandbyReplicas, clients, +tasksToRemainingStandbys, standbyTaskClientsByTaskLoad, task); +} +return kafkaStreamsAssignments; +} + +private static void assignStandbyTasksForActiveTask(final int numStandbyReplicas, +final Map clients, +final Map tasksToRemainingStandbys, +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad, +
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619898066 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]
ableegoldman commented on code in PR #16129: URL: https://github.com/apache/kafka/pull/16129#discussion_r1619896882 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final TaskInfo task, } return true; } + +private static Map tagBasedStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { +final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); +final Map tasksToRemainingStandbys = applicationState.allTasks().values().stream() +.collect(Collectors.toMap(TaskInfo::id, taskInfo -> numStandbyReplicas)); +final Map streamStates = applicationState.kafkaStreamsStates(false); + +final Set rackAwareAssignmentTags = new HashSet<>(getRackAwareAssignmentTags(applicationState)); +final TagStatistics tagStatistics = new TagStatistics(applicationState); + +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments); + +final Set statefulTaskIds = applicationState.allTasks().values().stream() +.filter(TaskInfo::isStateful) +.map(TaskInfo::id) +.collect(Collectors.toSet()); +final Map clientsByUuid = kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap( +entry -> entry.getKey().id(), +Map.Entry::getValue +)); +final Function> clientTagGetter = createClientTagGetter(applicationState); + +final Map pendingStandbyTasksToClientId = new HashMap<>(); +for (final TaskId statefulTaskId : statefulTaskIds) { +for (final KafkaStreamsAssignment assignment : clientsByUuid.values()) { +if (assignment.tasks().containsKey(statefulTaskId)) { +assignStandbyTasksToClientsWithDifferentTags( +numStandbyReplicas, +standbyTaskClientsByTaskLoad, +statefulTaskId, +assignment.processId(), +rackAwareAssignmentTags, +streamStates, +kafkaStreamsAssignments, +tasksToRemainingStandbys, +tagStatistics.tagKeyToValues, +tagStatistics.tagEntryToClients, +pendingStandbyTasksToClientId, +clientTagGetter +); +} +} +} + +if (!tasksToRemainingStandbys.isEmpty()) { +assignPendingStandbyTasksToLeastLoadedClients(clientsByUuid, +numStandbyReplicas, +standbyTaskClientsByTaskLoad, +tasksToRemainingStandbys); +} + +return kafkaStreamsAssignments; +} + +private static Map loadBasedStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { +final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); +final Map tasksToRemainingStandbys = applicationState.allTasks().values().stream() +.collect(Collectors.toMap(TaskInfo::id, taskInfo -> numStandbyReplicas)); +final Map streamStates = applicationState.kafkaStreamsStates(false); + +final Set statefulTaskIds = applicationState.allTasks().values().stream() +.filter(TaskInfo::isStateful) +.map(TaskInfo::id) +.collect(Collectors.toSet()); +final Map clients = kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap( +entry -> entry.getKey().id(), +Map.Entry::getValue +)); + +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments); + standbyTaskClientsByTaskLoad.offerAll(streamStates.keySet().stream().map(ProcessId::id).collect(Collectors.toSet())); +for (final TaskId task : statefulTaskIds) { +assignStandbyTasksForActiveTask(numStandbyReplicas, clients, +tasksToRemainingStandbys, standbyTaskClientsByTaskLoad, task); +} +return kafkaStreamsAssignments; +} + +private static void assignStandbyTasksForActiveTask(final int numStandbyReplicas, +final Map clients, +final Map tasksToRemainingStandbys, +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad, +
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619894784 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above Review Comment: nit: // Revoke the partitions that either: // 1. Are not part of the member's current subscription. // 2. Exceed the maximum quota assigned to each member. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619892264 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. Review Comment: nit: members that* should -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619888416 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -53,8 +42,17 @@ * The assignment builder prioritizes the properties in the following order: * Balance > Stickiness. */ -public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { -private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +public class OptimizedUniformAssignmentBuilder { +private static final Class UNMODIFIALBE_MAP_CLASS = Collections.unmodifiableMap(new HashMap<>()).getClass(); +private static final Class EMPTY_MAP_CLASS = Collections.emptyMap().getClass(); + +/** + * @return True if the provided map is an UnmodifiableMap or EmptyMap. Those classes are not + * public hence we cannot use the `instanceof` operator. + */ +private static boolean isImmutableMap(Map map) { +return UNMODIFIALBE_MAP_CLASS.isInstance(map) || EMPTY_MAP_CLASS.isInstance(map); Review Comment: nit: UNMODIFIABLE * -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]
ableegoldman commented on code in PR #16129: URL: https://github.com/apache/kafka/pull/16129#discussion_r1619808751 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java: ## @@ -30,13 +30,13 @@ /** * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment */ -class ConstrainedPrioritySet { +public class ConstrainedPrioritySet { private final PriorityQueue clientsByTaskLoad; private final BiFunction constraint; private final Set uniqueClients = new HashSet<>(); -ConstrainedPrioritySet(final BiFunction constraint, +public ConstrainedPrioritySet(final BiFunction constraint, final Function weight) { Review Comment: nit: adjust parameter indentation ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final TaskInfo task, } return true; } + +private static Map tagBasedStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { +final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); +final Map tasksToRemainingStandbys = applicationState.allTasks().values().stream() +.collect(Collectors.toMap(TaskInfo::id, taskInfo -> numStandbyReplicas)); +final Map streamStates = applicationState.kafkaStreamsStates(false); + +final Set rackAwareAssignmentTags = new HashSet<>(getRackAwareAssignmentTags(applicationState)); +final TagStatistics tagStatistics = new TagStatistics(applicationState); + +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments); + +final Set statefulTaskIds = applicationState.allTasks().values().stream() +.filter(TaskInfo::isStateful) +.map(TaskInfo::id) +.collect(Collectors.toSet()); +final Map clientsByUuid = kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap( +entry -> entry.getKey().id(), +Map.Entry::getValue +)); +final Function> clientTagGetter = createClientTagGetter(applicationState); + +final Map pendingStandbyTasksToClientId = new HashMap<>(); +for (final TaskId statefulTaskId : statefulTaskIds) { +for (final KafkaStreamsAssignment assignment : clientsByUuid.values()) { +if (assignment.tasks().containsKey(statefulTaskId)) { +assignStandbyTasksToClientsWithDifferentTags( +numStandbyReplicas, +standbyTaskClientsByTaskLoad, +statefulTaskId, +assignment.processId(), +rackAwareAssignmentTags, +streamStates, +kafkaStreamsAssignments, +tasksToRemainingStandbys, +tagStatistics.tagKeyToValues, +tagStatistics.tagEntryToClients, +pendingStandbyTasksToClientId, +clientTagGetter +); +} +} +} + +if (!tasksToRemainingStandbys.isEmpty()) { +assignPendingStandbyTasksToLeastLoadedClients(clientsByUuid, +numStandbyReplicas, +standbyTaskClientsByTaskLoad, +tasksToRemainingStandbys); +} + +return kafkaStreamsAssignments; +} + +private static Map loadBasedStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { +final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); +final Map tasksToRemainingStandbys = applicationState.allTasks().values().stream() +.collect(Collectors.toMap(TaskInfo::id, taskInfo -> numStandbyReplicas)); +final Map streamStates = applicationState.kafkaStreamsStates(false); + +final Set statefulTaskIds = applicationState.allTasks().values().stream() +.filter(TaskInfo::isStateful) +.map(TaskInfo::id) +.collect(Collectors.toSet()); +final Map clients = kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap( +entry -> entry.getKey().id(), +Map.Entry::getValue +)); + +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments); + standbyTaskClientsByTaskLoad.
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619884277 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -18,29 +18,18 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.lang.Math.min; /** - * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with Review Comment: Interesting⊠So youâre fine with adding those names to the public interface (what the end user really see) but not to follow the same naming scheme for the internal classes (that the end user donât see at all). In my opinion, it makes sense to align as it will make it more coherent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619875174 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -18,29 +18,18 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.lang.Math.min; /** - * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with Review Comment: I feel like homogeneous and heterogeneous are harder names from a user pov, I think we should keep it as is -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
gongxuanzhang commented on PR #16097: URL: https://github.com/apache/kafka/pull/16097#issuecomment-2138615902 I revised this PR.It add spotless gradle plugin and mini configuration. It contains two part. 1. checkstyle import order rule. But suppress all module. you can change `checkstyle/suppressions.xml` open module check If you want 2. auto format import order. Again, I did not open in any module. If you want open the auto format. Just add a module name into `build.gradle #spotlessApplyModules` like `def spotlessApplyModules = ['core']` .then run `./gradlew spotlessApply` you will get formated code module -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
github-actions[bot] commented on PR #15265: URL: https://github.com/apache/kafka/pull/15265#issuecomment-2138611194 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]
ableegoldman commented on code in PR #16129: URL: https://github.com/apache/kafka/pull/16129#discussion_r1619777550 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -244,18 +274,117 @@ public static Map optimizeRackAwareStandbyTas ); LOG.info("Assignment before standby task optimization has cost {}", initialCost); -throw new UnsupportedOperationException("Not yet Implemented."); +final MoveStandbyTaskPredicate moveablePredicate = getStandbyTaskMovePredicate(applicationState); +final BiFunction> getMovableTasks = (source, destination) -> { +return source.tasks().values().stream().filter(task -> task.type() == AssignedTask.Type.STANDBY) +.filter(task -> { +return !destination.tasks().containsKey(task.id()); +}) +.filter(task -> { +final KafkaStreamsState sourceState = kafkaStreamsStates.get(source.processId()); +final KafkaStreamsState destinationState = kafkaStreamsStates.get(source.processId()); +return moveablePredicate.canMoveStandbyTask(sourceState, destinationState, task.id(), kafkaStreamsAssignments); +}) +.map(AssignedTask::id) +.sorted() +.collect(Collectors.toList()); +}; + +final long startTime = System.currentTimeMillis(); +boolean taskMoved = true; +int round = 0; +final RackAwareGraphConstructor graphConstructor = RackAwareGraphConstructorFactory.create( + applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds); +while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) { +taskMoved = false; +round++; +for (int i = 0; i < kafkaStreamsAssignments.size(); i++) { +final UUID clientId1 = clientIds.get(i); +final KafkaStreamsAssignment clientState1 = kafkaStreamsAssignments.get(new ProcessId(clientId1)); +for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) { +final UUID clientId2 = clientIds.get(i); +final KafkaStreamsAssignment clientState2 = kafkaStreamsAssignments.get(new ProcessId(clientId2)); + +final String rack1 = clientRacks.get(clientState1.processId().id()).get(); +final String rack2 = clientRacks.get(clientState2.processId().id()).get(); +// Cross rack traffic can not be reduced if racks are the same +if (rack1.equals(rack2)) { +continue; +} + +final List movable1 = getMovableTasks.apply(clientState1, clientState2); +final List movable2 = getMovableTasks.apply(clientState2, clientState1); + +// There's no needed to optimize if one is empty because the optimization +// can only swap tasks to keep the client's load balanced +if (movable1.isEmpty() || movable2.isEmpty()) { +continue; +} + +final List taskIdList = Stream.concat(movable1.stream(), +movable2.stream()) Review Comment: I noticed this is messed up in the original code too, but can you fix the indentation/remove the line break (just here, no need to fix it in the RackAwareTaskAssignor too) ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -244,18 +274,117 @@ public static Map optimizeRackAwareStandbyTas ); LOG.info("Assignment before standby task optimization has cost {}", initialCost); -throw new UnsupportedOperationException("Not yet Implemented."); +final MoveStandbyTaskPredicate moveablePredicate = getStandbyTaskMovePredicate(applicationState); +final BiFunction> getMovableTasks = (source, destination) -> { +return source.tasks().values().stream().filter(task -> task.type() == AssignedTask.Type.STANDBY) +.filter(task -> { +return !destination.tasks().containsKey(task.id()); +}) +.filter(task -> { +final KafkaStreamsState sourceState = kafkaStreamsStates.get(source.processId()); +final KafkaStreamsState destinationState = kafkaStreamsStates.get(source.processId()); +return moveablePredicate.canMoveStandbyTask(sourceState, destinationState, task.id(), kafkaStreamsAssignments); +}) +.map(AssignedTask::id) +.sorted() +.collect(Collectors.toList()); +}; + +final long startTime = System.currentTimeMillis(); +boolean taskMoved =
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]
ableegoldman commented on code in PR #16129: URL: https://github.com/apache/kafka/pull/16129#discussion_r1619732219 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ## @@ -278,7 +278,7 @@ public void processOptimizedAssignments(final Map entry : optimizedAssignments.entrySet()) { final ProcessId processId = entry.getKey(); -final Set assignedTasks = optimizedAssignments.get(processId).assignment(); +final Set assignedTasks = new HashSet<>(optimizedAssignments.get(processId).tasks().values()); Review Comment: This can be a followup PR, but one of the nice things about making KafkaStreamsAssignment mutable is that we should be able to get rid of the `newAssignments` field altogether and stop converting back and forth between the KafkaStreamsAssignment and raw Sets so that we can add/remove tasks. In addition to the obvious code simplification that should result from this change, it should save us a lot of copying things into various different data structures and reduce a lot of overhead ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java: ## @@ -62,14 +64,24 @@ public static KafkaStreamsAssignment of(final ProcessId processId, final Set assignment, final Optional followupRebalanceDeadline) { +this( +processId, +assignment.stream().collect(Collectors.toMap(AssignedTask::id, Function.identity())), +followupRebalanceDeadline +); +} + +private KafkaStreamsAssignment(final ProcessId processId, Review Comment: nit: seems kind of unnecessary to introduce yet another constructor for this, we can just inline things. ie ``` this.assignment = assignment.stream().collect(Collectors.toMap(AssignedTask::id, Function.identity())); ``` ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java: ## @@ -83,16 +95,10 @@ public ProcessId processId() { /** * - * @return a set of assigned tasks that are part of this {@code KafkaStreamsAssignment} + * @return a read-only set of assigned tasks that are part of this {@code KafkaStreamsAssignment} */ -public Set assignment() { -// TODO change assignment to return a map so we aren't forced to copy this into a Set -return new HashSet<>(assignment.values()); -} - -// TODO: merge this with #assignment by having it return a Map -public Set assignedTaskIds() { -return assignment.keySet(); +public Map tasks() { +return unmodifiableMap(assignment); Review Comment: nit: should probably changed the field name too (ie `assignment` --> `tasks`) ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java: ## @@ -489,10 +489,10 @@ public SortedMap> taskIdsByPreviousConsumer() { } public void setAssignedTasks(final KafkaStreamsAssignment assignment) { -final Set activeTasks = assignment.assignment().stream() +final Set activeTasks = assignment.tasks().values().stream() .filter(task -> task.type() == ACTIVE).map(KafkaStreamsAssignment.AssignedTask::id) .collect(Collectors.toSet()); -final Set standbyTasks = assignment.assignment().stream() +final Set standbyTasks = assignment.tasks().values().stream() .filter(task -> task.type() == STANDBY).map(KafkaStreamsAssignment.AssignedTask::id) .collect(Collectors.toSet()); assignedActiveTasks.taskIds(activeTasks); Review Comment: nit: I keep getting confused by this and then realizing it's just a poorly-named method, can you rename it to `#setTaskIds` or something like that? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -785,12 +789,13 @@ private UserTaskAssignmentListener assignTasksToClients(final Cluster fullMetada ); final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor = userTaskAssignor.get(); final TaskAssignment taskAssignment = assignor.assign(applicationState); -processStreamsPartitionAssignment(clientMetadataMap, taskAssignment); final AssignmentError assignmentError = validateTaskAssignment(applicationState, taskAssignment); -userTaskAssignmentListener = (GroupAssignment assignment, GroupSubscription subscription) -> { +processStreamsPartitionAssignment(clientMetadataMap, taskAssignment); +userTaskAssignmentListener = (assignment, subscription) -> { assignor.onAssignmentComputed(assignment, subscription, assignmentError); }; Review Comment: Really doesn't matter, this is jus
Re: [PR] MINOR: Add more unit tests to LogSegments [kafka]
showuon commented on code in PR #16085: URL: https://github.com/apache/kafka/pull/16085#discussion_r1619753582 ## core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala: ## @@ -238,5 +245,25 @@ class LogSegmentsTest { assertEquals(Int.MaxValue, LogSegments.sizeInBytes(asList(logSegment))) assertEquals(largeSize, LogSegments.sizeInBytes(asList(logSegment, logSegment))) assertTrue(UnifiedLog.sizeInBytes(asList(logSegment, logSegment)) > Int.MaxValue) + +val logSegments: LogSegments = new LogSegments(topicPartition) +logSegments.add(logSegment) +assertEquals(Int.MaxValue, logSegments.sizeInBytes()) + } + + @Test + def testUpdateDir(): Unit = { +val seg1 = createSegment(1) Review Comment: seg1 should be closed. ## core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala: ## @@ -105,8 +109,6 @@ class LogSegmentsTest { assertFalse(segments.nonEmpty) assertEquals(0, segments.numberOfSegments) assertFalse(segments.contains(offset1)) - -segments.close() Review Comment: You're right. So it is a bug here. Now, I think what we have to do is manually close seg1, seg2, seg3 here, otherwise, there will be resource leak. ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -632,6 +632,15 @@ class LogSegmentTest { Utils.delete(tempDir) } + @Test + def testGetFirstBatchTimestamp(): Unit = { +val segment = createSegment(1) Review Comment: ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Make VoterSetHistoryTest.testAddAt make sense [kafka]
dengziming merged PR #16104: URL: https://github.com/apache/kafka/pull/16104 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16639: AsyncKafkaConsumer#close dose not send heartbeat to leave group. [kafka]
frankvicky commented on code in PR #16017: URL: https://github.com/apache/kafka/pull/16017#discussion_r1619740146 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -752,6 +752,26 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { assertEquals(1, result.unsentRequests.size(), "Fenced member should resume heartbeat after transitioning to JOINING"); } +@ParameterizedTest +@ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) +public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short version) { +mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a previous one is in-flight"); + +membershipManager.leaveGroup(); +assertTrue(membershipManager.isLeavingGroup()); Review Comment: This makes sense. In the unit test for 'HeartBratRequestManager,' we should focus solely on its functionality. I will remove this line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1619707209 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ## @@ -50,14 +51,13 @@ class ConsumerGroupCommandTestUtils { private ConsumerGroupCommandTestUtils() { } -static List generator() { +static List generator(boolean onlyConsumerGroupCoordinator) { Review Comment: Updated it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate DescribeConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15908: URL: https://github.com/apache/kafka/pull/15908#issuecomment-2138512364 > > That is why testing time takes longer. In my latest update, I change to run following cases. Also, in old framework, we started KRAFT server with group.coordinator.new.enable=true twice. One for classic group protocol and another for consumer group protocol. In new framework, we can get supported group protocol by ClusterInstance#supportedGroupProtocols, so I use one KRAFT server with group.coordinator.new.enable=true to run two protocols. We can reduce time to setup one KRAFT cluster. I hope this change can reduce overall testing time. Thanks. > > (ZK / KRAFT servers) with (group.coordinator.new.enable=false) with (classic group protocol) = 2 cases > > (KRAFT servers) with (group.coordinator.new.enable=true) with (classic / consumer group protocols) = 2 cases > > agree. could this PR include this change? Yes, I already changed it. Please take a look for `DescribeConsumerGroupTest#generator`. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured
[ https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16790. --- Fix Version/s: 3.8.0 3.7.1 Resolution: Fixed > Calls to RemoteLogManager are made before it is configured > -- > > Key: KAFKA-16790 > URL: https://issues.apache.org/jira/browse/KAFKA-16790 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.8.0 >Reporter: Christo Lolov >Assignee: Muralidhar Basani >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) > which in turn calls RemoteLogManager#onLeadershipChange (2), however, the > RemoteLogManager is configured after the BrokerMetadataPublisher starts > running (3, 4). This is incorrect, we either need to initialise the > RemoteLogManager before we start the BrokerMetadataPublisher or we need to > skip calls to onLeadershipChange if the RemoteLogManager is not initialised. > (1) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151] > (2) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737] > (3) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432] > (4) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515] > The way to reproduce the problem is by looking at the following changes > {code:java} > config/kraft/broker.properties             | 10 ++ >  .../main/java/kafka/log/remote/RemoteLogManager.java  |  8 +++- >  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +- >  3 files changed, 22 insertions(+), 2 deletions(-)diff --git > a/config/kraft/broker.properties b/config/kraft/broker.properties > index 2d15997f28..39d126cf87 100644 > --- a/config/kraft/broker.properties > +++ b/config/kraft/broker.properties > @@ -127,3 +127,13 @@ log.segment.bytes=1073741824 >  # The interval at which log segments are checked to see if they can be > deleted according >  # to the retention policies >  log.retention.check.interval.ms=30 > + > +remote.log.storage.system.enable=true > +remote.log.metadata.manager.listener.name=PLAINTEXT > +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage > +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar > +remote.log.storage.manager.impl.prefix=rsm.config. > +remote.log.metadata.manager.impl.prefix=rlmm.config. > +rsm.config.dir=/tmp/kafka-remote-storage > +rlmm.config.remote.log.metadata.topic.replication.factor=1 > +log.retention.check.interval.ms=1000 > diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java > b/core/src/main/java/kafka/log/remote/RemoteLogManager.java > index 6555b7c0cd..e84a072abc 100644 > --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java > +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java > @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable { >    // The endpoint for remote log metadata manager to connect to >    private Optional endpoint = Optional.empty(); >    private boolean closed = false; > +   private boolean up = false; >  >    /** >    * Creates RemoteLogManager instance with the given arguments. > @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable { >      // in connecting to the brokers or remote storages. >      configureRSM(); >      configureRLMM(); > +     up = true; >    } >  >    public RemoteStorageManager storageManager() { > @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable { >    public void onLeadershipChange(Set partitionsBecomeLeader, >                   Set partitionsBecomeFollower, >                   Map topicIds) { > -     LOGGER.debug("Received leadership changes for leaders: {} and > followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); > +     if (!up) { > +       LOGGER.error("NullPointerException"); > +       return; > +     } > +     LOGGER.error("Received leadership changes for leaders: {} and > followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); >  >      Map leaderPartitionsWithLeaderEpoch = > filterPartitions(partitionsBecomeLeader) >          .collect(Collectors.toMap( > diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala > b/core/src/main/scala/kafka/server/ReplicaManager.scala > index 35499430d6..bd3f41c3d6 100644 > --- a/core/src/main/scala/kafka/server/Repl
[jira] [Commented] (KAFKA-16858) Flatten SMT throws NPE
[ https://issues.apache.org/jira/browse/KAFKA-16858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850530#comment-17850530 ] Greg Harris commented on KAFKA-16858: - Thanks so much for the additional context [~undone] this is a very odd bug. I managed to reproduce a Flatten NPE with mutable input objects rather than mocking:  {noformat} xformValue.configure(Collections.emptyMap()); Schema innerSchema = SchemaBuilder.struct().optional(); Struct innerStruct = new Struct(innerSchema); AtomicReference valueSchema = new AtomicReference<>(innerSchema); Schema arraySchema = SchemaBuilder.array(new ConnectSchema(Schema.Type.ARRAY) { @Override public Schema valueSchema() { return valueSchema.get(); } }); Schema schema = SchemaBuilder.struct().field("field", arraySchema); Struct value = new Struct(schema).put("field", Collections.singletonList(Collections.singletonList(innerStruct))); valueSchema.set(null); SourceRecord record = new SourceRecord(null, null, "topic", 0, schema, value); xformValue.apply(record);{noformat} It throws this error: {noformat} Cannot invoke "org.apache.kafka.connect.data.Schema.name()" because "schema" is null java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.name()" because "schema" is null   at org.apache.kafka.connect.data.ConnectSchema.expectedClassesFor(ConnectSchema.java:268)   at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:224)   at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)   at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)   at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)   at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)   at org.apache.kafka.connect.data.Struct.put(Struct.java:216)   at org.apache.kafka.connect.data.Struct.put(Struct.java:203)   at org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:250)   at org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:164)   at org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:79){noformat} This is different from your stacktrace in two ways: # Instead of 4 buildWithSchema calls, there's only 1. This is because my test Struct is nested less deeply than your Struct, and a deeper test case behaves almost identically. # Instead of 3 validateValue calls, there's 5. This is because I'm using an a nested array "[[Struct]]" instead of your singly-nested array "[Struct]". This one is a bit more important, because the reproduction case doesn't work for singly-nested arrays. The difference is that a singly-nested array has it's valueSchema evaluated during `buildUpdatedSchema`, and the doubly-nested array has it's valueSchema evaluated during `buildWithSchema`. When the null valueSchema is evaluated during buildUpdatedSchema, it throws this exception instead:  {noformat} valueSchema cannot be null. org.apache.kafka.connect.errors.SchemaBuilderException: valueSchema cannot be null.   at app//org.apache.kafka.connect.data.SchemaBuilder.array(SchemaBuilder.java:363)   at app//org.apache.kafka.connect.transforms.util.SchemaUtil.copySchemaBasics(SchemaUtil.java:29)   at app//org.apache.kafka.connect.transforms.Flatten.convertFieldSchema(Flatten.java:225)   at app//org.apache.kafka.connect.transforms.Flatten.buildUpdatedSchema(Flatten.java:201)   at app//org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:156)   at app//org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:79){noformat} I was unable to reproduce this with non-mutable schemas, they trigger the NPE too early while the input Struct is being constructed. Some follow-ups: * Are you able to provide an anonymized form of your schema directly, rather than just a high-level "Array of Structs"? I'm wondering if your schema is capable of triggering the use of the mutable SchemaWrapper [https://github.com/confluentinc/schema-registry/blob/7b886f309c83041d4f2a5b41b5910f3b8002413a/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java#L1779] inside the ProtobufConverter. * I don't have an explanation of how this can happen for empty and non-present arrays, as it looks like validateValue(ConnectSchema:255) can only be triggered by non-empty lists. * w.r.t. the variable validateValue depth: Are you saying that in _error cases_ the recursion depth is unpredictable, or in general? The validateValue should be called at every or almost every location in the tree of values, so I would expect to see lots of different recursion depths. Maybe you can share some more stacktraces as examples. * So far in this investigation, I'm trying to find the source of the null in hopes that we can prevent it, a
Re: [PR] KAFKA-16790: Update RemoteLogManager configuration in broker server [kafka]
showuon merged PR #16005: URL: https://github.com/apache/kafka/pull/16005 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update RemoteLogManager configuration in broker server - KAFKA-16790 [kafka]
showuon commented on PR #16005: URL: https://github.com/apache/kafka/pull/16005#issuecomment-2138452239 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Improve producer ID expiration performance [kafka]
jolshan merged PR #16075: URL: https://github.com/apache/kafka/pull/16075 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]
jolshan opened a new pull request, #16130: URL: https://github.com/apache/kafka/pull/16130 As per KIP-1022, we will rename the unstable metadata versions enabled config to support all feature versions. Features is also updated to return latest production and latest testing versions of each feature. A feature is production ready when the corresponding metadata version (bootstrapMetadataVersion) is production ready. Adds tests for the feature usage of the unstableFeatureVersionsEnabled config -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan merged PR #15685: URL: https://github.com/apache/kafka/pull/15685 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on PR #15685: URL: https://github.com/apache/kafka/pull/15685#issuecomment-2138416302 Test failures are unrelated. Merging đ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [WIP] [kafka]
apourchet opened a new pull request, #16129: URL: https://github.com/apache/kafka/pull/16129 This fills in the implementation details of the standby task assignment utility functions within TaskAssignmentUtils. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 9) TaskAssignmentUtils implementation of optimizeRackAwareActiveTasks [kafka]
ableegoldman merged PR #16033: URL: https://github.com/apache/kafka/pull/16033 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 9) TaskAssignmentUtils implementation of optimizeRackAwareActiveTasks [kafka]
ableegoldman commented on PR #16033: URL: https://github.com/apache/kafka/pull/16033#issuecomment-2138398308 Test failures are unrelated, merging to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 9) TaskAssignmentUtils implementation of optimizeRackAwareActiveTasks [kafka]
ableegoldman commented on code in PR #16033: URL: https://github.com/apache/kafka/pull/16033#discussion_r1619561495 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -16,78 +16,395 @@ */ package org.apache.kafka.streams.processor.assignment; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.SortedSet; +import java.util.UUID; +import java.util.stream.Collectors; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.internals.assignment.Graph; +import org.apache.kafka.streams.processor.internals.assignment.MinTrafficGraphConstructor; +import org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructor; +import org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructorFactory; +import org.apache.kafka.streams.StreamsConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A set of utilities to help implement task assignment via the {@link TaskAssignor} */ public final class TaskAssignmentUtils { +private static final Logger LOG = LoggerFactory.getLogger(TaskAssignmentUtils.class); + +private TaskAssignmentUtils() {} + /** - * Assign standby tasks to KafkaStreams clients according to the default logic. - * - * If rack-aware client tags are configured, the rack-aware standby task assignor will be used + * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients * - * @param applicationStatethe metadata and other info describing the current application state - * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients + * @param applicationState the metadata and other info describing the current application state * - * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default - * standby assignment + * @return a new map containing an assignment that replicates exactly the previous assignment reported + * in the applicationState */ -public static Map defaultStandbyTaskAssignment( -final ApplicationState applicationState, -final Map kafkaStreamsAssignments -) { -throw new UnsupportedOperationException("Not Implemented."); +public static Map identityAssignment(final ApplicationState applicationState) { +final Map assignments = new HashMap<>(); +applicationState.kafkaStreamsStates(false).forEach((processId, state) -> { +final Set tasks = new HashSet<>(); +state.previousActiveTasks().forEach(taskId -> { +tasks.add(new AssignedTask(taskId, +AssignedTask.Type.ACTIVE)); +}); +state.previousStandbyTasks().forEach(taskId -> { +tasks.add(new AssignedTask(taskId, +AssignedTask.Type.STANDBY)); +}); + +final KafkaStreamsAssignment newAssignment = KafkaStreamsAssignment.of(processId, tasks); +assignments.put(processId, newAssignment); +}); +return assignments; } /** - * Optimize the active task assignment for rack-awareness + * Optimize active task assignment for rack awareness. This optimization is based on the + * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} + * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost} + * configs which balance cross rack traffic minimization and task movement. + * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting + * assignment, but can increase the number of tasks shuffled around between clients. + * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client + * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher + * cross-rack traffic. + * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value, + * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0, + * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment. + * + * Note: this method will modify the input {@link KafkaStreamsAssignment} objects and return the same map. + * It does not make a copy of the map or the KafkaStreamsAssignment objects. + * + * This method optim
Re: [PR] Update RemoteLogManager configuration in broker server - KAFKA-16790 [kafka]
muralibasani commented on PR #16005: URL: https://github.com/apache/kafka/pull/16005#issuecomment-2138352697 @nikramakrishnan @showuon there are some test failures but not sure if related to ours ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850511#comment-17850511 ] Chia-Ping Tsai commented on KAFKA-15305: [~kirktrue] As you unassign this ticket, I take over this one and will file PR according to above solution. > The background thread should try to process the remaining task until the > shutdown timer is expired > -- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Chia-Ping Tsai >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. >  > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
cmccabe commented on PR #15986: URL: https://github.com/apache/kafka/pull/15986#issuecomment-2138338743 Thanks for the PR, @jsancio . A few meta-comments: - I'm not sure I see the benefit to changing the `toString` functions to use `String.format`. It seems more brittle than the simple string concatenation approach. Unless you want to print something in a specific way, like `String.format("%02d", myInt)`. But that isn't the case here. - Changing the SharedServer constructor is a huge pain and generates a lot of churn. Since the thing you're passing comes from the static configuration anyway, let's not do that. - I think we should try to avoid doing too much validation in `KafkaConfig`. Things like hostnames should be resolved when we actually need them. It would be silly for one unresolvable hostname to make configuration validation fail, and hence fail the whole kcontroller startup process. - I don't think we want to change all of the tests to use `controller.quorum.bootstrap.servers`. We still need to support the old configuration. Let's make it so that tests using IBP_3_8_IV0 or newer use the new thing, and tests using an older MV use the old configuration. That way we will have good coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 14) Callback to TaskAssignor::onAssignmenComputed [kafka]
apourchet commented on code in PR #16123: URL: https://github.com/apache/kafka/pull/16123#discussion_r1619452846 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -768,15 +775,21 @@ private void assignTasksToClients(final Cluster fullMetadata, final Optional userTaskAssignor = userTaskAssignorSupplier.get(); +UserTaskAssignmentListener userTaskAssignmentListener = (GroupAssignment assignment, GroupSubscription subscription) -> { }; Review Comment: You can't change it once you make it final. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-15305: -- Assignee: Chia-Ping Tsai > The background thread should try to process the remaining task until the > shutdown timer is expired > -- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Chia-Ping Tsai >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. >  > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850215#comment-17850215 ] Kirk True edited comment on KAFKA-16792 at 5/29/24 10:04 PM: - These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) The following still don't work: - testCurrentLag - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions - testPollReturnsRecords - testResetToCommittedOffset was (Author: kirktrue): These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) - testFetchStableOffsetThrowInPosition (wasn't in the above list) The following still don't work: - testCurrentLag - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions - testPollReturnsRecords - testResetToCommittedOffset > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testFetchStableOffsetThrowInPoll > - testCurrentLag > - testListOffsetShouldUpdateSubscriptions > - testResetToCommittedOffset > - testResetUsingAutoResetPolicy > - testPollReturnsRecords > Â -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850215#comment-17850215 ] Kirk True edited comment on KAFKA-16792 at 5/29/24 10:03 PM: - These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) - testFetchStableOffsetThrowInPosition (wasn't in the above list) The following still don't work: - testCurrentLag - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions - testPollReturnsRecords - testResetToCommittedOffset was (Author: kirktrue): These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) - testFetchStableOffsetThrowInPosition (wasn't in the above list) The following still don't work: - testCurrentLag - testListOffsetShouldUpdateSubscriptions - testPollReturnsRecords - testResetToCommittedOffset > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testFetchStableOffsetThrowInPoll > - testCurrentLag > - testListOffsetShouldUpdateSubscriptions > - testResetToCommittedOffset > - testResetUsingAutoResetPolicy > - testPollReturnsRecords > Â -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
cmccabe commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1619507872 ## raft/src/main/java/org/apache/kafka/raft/FollowerState.java: ## @@ -156,22 +156,24 @@ public boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate) { log.debug( "Rejecting vote request from candidate ({}) since we already have a leader {} in epoch {}", candidateKey, -leaderId(), +leader, epoch ); return false; } @Override public String toString() { -return "FollowerState(" + -"fetchTimeoutMs=" + fetchTimeoutMs + -", epoch=" + epoch + -", leaderId=" + leaderId + -", voters=" + voters + -", highWatermark=" + highWatermark + -", fetchingSnapshot=" + fetchingSnapshot + -')'; +return String.format( Review Comment: Hmm, I'm not sure I see a lot of benefit to this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
cmccabe commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1619506788 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -21,12 +21,12 @@ import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.{NewPartitions, NewTopic} import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} -import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition} Review Comment: It would be better to do this in a separate cleanup PR. ## core/src/test/resources/log4j.properties: ## @@ -21,6 +21,5 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.kafka=WARN log4j.logger.org.apache.kafka=WARN - Review Comment: It would be better to do this in a separate cleanup PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
soarez commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1619521840 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -105,6 +108,49 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } +@Override +public Config validate(Map props) { +List configValues = super.validate(props).configValues(); +String emitCheckpointsValue = Optional.ofNullable(props.get(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED_DEFAULT)); +String syncGroupOffsetsValue = Optional.ofNullable(props.get(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED_DEFAULT)); +String emitOffsetSyncsValue = Optional.ofNullable(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).orElse(Boolean.toString(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED_DEFAULT)); + +if ("false".equals(emitCheckpointsValue) && "false".equals(syncGroupOffsetsValue)) { +ConfigValue syncGroupOffsets = configValues.stream().filter(prop -> MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED.equals(prop.name())) +.findAny() +.orElseGet(() -> { +ConfigValue result = new ConfigValue(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED); +configValues.add(result); +return result; +}); + +ConfigValue emitCheckpoints = configValues.stream().filter(prop -> MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED.equals(prop.name())) +.findAny() +.orElseGet(() -> { +ConfigValue result = new ConfigValue(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED); +configValues.add(result); +return result; +}); + +String errorMessage = "MirrorCheckpointConnector can't run with both" + +MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED + ", " + MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED + "set to false"; +syncGroupOffsets.addErrorMessage(errorMessage); +emitCheckpoints.addErrorMessage(errorMessage); +} +if ("false".equals(emitOffsetSyncsValue) && ("true".equals(emitCheckpointsValue) || "true".equals(syncGroupOffsetsValue))) { +ConfigValue emitOffsetSyncs = configValues.stream().filter(prop -> MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED.equals(prop.name())) +.findAny() +.orElseGet(() -> { +ConfigValue result = new ConfigValue(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED); +configValues.add(result); +return result; +}); + +emitOffsetSyncs.addErrorMessage("MirrorCheckpointConnector can't run while MirrorSourceConnector configured with" + +MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED + "set to false"); +} +return new Config(configValues); Review Comment: Thanks for the explanation Omnia. That makes sense. Could `MirrorCheckpointConfig` have a new method performs these validations, and we call that method from `MirrorCheckPointconnector#validate`? Maybe the method could return `Optional` if there is any validation issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
cmccabe commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1619520419 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -55,15 +57,41 @@ final public class VoterSet { } /** - * Returns the socket address for a given voter at a given listener. + * Returns the node information for all the given voter ids and listener. * - * @param voter the id of the voter - * @param listener the name of the listener - * @return the socket address if it exists, otherwise {@code Optional.empty()} + * @param voterIds the ids of the voters + * @param listenerName the name of the listener + * @return the node information for all of the voter ids + * @throws IllegalArgumentException if there are missing endpoints */ -public Optional voterAddress(int voter, String listener) { -return Optional.ofNullable(voters.get(voter)) -.flatMap(voterNode -> voterNode.address(listener)); +public Set voterNodes(Stream voterIds, ListenerName listenerName) { Review Comment: This seems like the wrong behavior to me for the case where one of the voters doesn't have a specific listener, but the other ones do. Wouldn't we want to continue to use the other, working voters in that case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15305: - Assignee: (was: Kirk True) > The background thread should try to process the remaining task until the > shutdown timer is expired > -- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. >  > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
soarez commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1619486214 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -73,6 +73,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_TOPIC_CONFIG_PREFIX; +import static org.apache.kafka.connect.mirror.MirrorSourceConfig.OFFSET_SYNCS_CLIENT_ROLE_PREFIX; Review Comment: This constant is declared in MirrorConnectorConfig, so the static import to refer to that class directly ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java: ## @@ -50,6 +50,40 @@ public class MirrorCheckpointConnectorTest { private static final String CONSUMER_GROUP = "consumer-group-1"; private static final Map SOURCE_OFFSET = MirrorUtils.wrapOffset(0); +@Test +public void testEmitCheckpointsAndSyncGroupOffsetsBothDisabled() { +// disable the checkpoint emission +MirrorCheckpointConfig config = new MirrorCheckpointConfig( +makeProps("emit.checkpoints.enabled", "false", +"sync.group.offsets.enabled", "false")); + +Set knownConsumerGroups = new HashSet<>(); +knownConsumerGroups.add(CONSUMER_GROUP); +// MirrorCheckpointConnector as minimum to run taskConfig() +MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, +config); +List> output = connector.taskConfigs(1); +// expect no task will be created +assertEquals(0, output.size(), "MirrorCheckpointConnector not disabled"); +} + +@Test +public void testEmitOffsetSyncsDisabled() { +// disable the checkpoint emission +MirrorCheckpointConfig config = new MirrorCheckpointConfig( +makeProps("emit.checkpoints.enabled", "false", +MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED, "false")); + +Set knownConsumerGroups = new HashSet<>(); +knownConsumerGroups.add(CONSUMER_GROUP); +// MirrorCheckpointConnector as minimum to run taskConfig() +MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, +config); +List> output = connector.taskConfigs(1); +// expect no task will be created +assertEquals(0, output.size(), "MirrorCheckpointConnector not disabled"); Review Comment: Should this be extracted into a common method? e.g. `assertMirrorCheckpointConnectorDisabled(config)` ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java: ## @@ -437,6 +437,55 @@ public void testSendSyncEvent() { verify(producer, times(5)).send(any(), any()); } +@Test +public void testDisableEmitOffsetSync() { +byte[] recordKey = "key".getBytes(); +byte[] recordValue = "value".getBytes(); +int maxOffsetLag = 50; +int recordPartition = 0; +int recordOffset = 0; +int metadataOffset = 100; +String topicName = "topic"; +String sourceClusterName = "sourceCluster"; + +RecordHeaders headers = new RecordHeaders(); +ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy(); + +@SuppressWarnings("unchecked") +KafkaConsumer consumer = mock(KafkaConsumer.class); +@SuppressWarnings("unchecked") +KafkaProducer producer = mock(KafkaProducer.class); +MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class); +Semaphore outstandingOffsetSyncs = new Semaphore(1); +PartitionState partitionState = new PartitionState(maxOffsetLag); +Map partitionStates = new HashMap<>(); + +MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName, +replicationPolicy, maxOffsetLag, producer, outstandingOffsetSyncs, partitionStates, topicName, false); + +SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, +recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, +recordValue.length, recordKey, recordValue, headers, Optional.empty())); + +TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(sourceRecord.sourcePartition()); +partitionStates.put(sourceTopicPartition, partitionState); +RecordMetadata recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); + +ArgumentCaptor producerCallback = ArgumentCaptor.forClass(Callback.class); +when(producer.send(any(), producerCallback.capture())).thenAnswer(mockInvocation -> { +producerCallback.getValue().onCompletion(null, null)
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
cmccabe commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1619518236 ## raft/src/main/java/org/apache/kafka/raft/RequestManager.java: ## @@ -17,108 +17,296 @@ package org.apache.kafka.raft; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; -import java.util.List; +import java.util.Iterator; import java.util.Map; -import java.util.OptionalInt; +import java.util.Optional; import java.util.OptionalLong; import java.util.Random; -import java.util.Set; +import org.apache.kafka.common.Node; +/** + * The request manager keeps tracks of the connection with remote replicas. + * + * When sending a request update this type by calling {@code onRequestSent(Node, long, long)}. When + * the RPC returns a response, update this manager with {@code onResponseResult(Node, long, boolean, long)}. + * + * Connections start in the ready state ({@code isReady(Node, long)} returns true). + * + * When a request times out or completes successfully the collection will transition back to the + * ready state. + * + * When a request completes with an error it still transition to the backoff state until + * {@code retryBackoffMs}. + */ public class RequestManager { -private final Map connections = new HashMap<>(); -private final List voters = new ArrayList<>(); +private final Map connections = new HashMap<>(); Review Comment: I don't understand why you are changing this to treat node ID as a string. An Int seems better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move configDef out of core [kafka]
chia7712 commented on PR #16116: URL: https://github.com/apache/kafka/pull/16116#issuecomment-2138325276 @OmniaGM Could you please fix the conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Delete KafkaSecurityConfigs class [kafka]
chia7712 merged PR #16113: URL: https://github.com/apache/kafka/pull/16113 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16864) Copy on write in the Optimized Uniform Assignor
Ritika Reddy created KAFKA-16864: Summary: Copy on write in the Optimized Uniform Assignor Key: KAFKA-16864 URL: https://issues.apache.org/jira/browse/KAFKA-16864 Project: Kafka Issue Type: Sub-task Reporter: Ritika Reddy Assignee: David Jacot An optimization for the uniform (homogenous) assignor by avoiding creating a copy of all the assignments. Instead, the assignor creates a copy only if the assignment is updated. It is a sort of copy-on-write. This change reduces the overhead of the TargetAssignmentBuilder when ran with the uniform (homogenous) assignor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
cmccabe commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1619514531 ## raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java: ## @@ -199,6 +206,34 @@ private static Map parseVoterConnections( return voterMap; } +public static InetSocketAddress parseBootstrapServer(String bootstrapServer) { +String host = Utils.getHost(bootstrapServer); +if (host == null) { +throw new ConfigException( +String.format( +"Failed to parse host name from {} for the configuration {}. Each " + +"entry should be in the form \"{host}:{port}\"", +bootstrapServer, +QUORUM_BOOTSTRAP_SERVERS_CONFIG +) +); +} + +Integer port = Utils.getPort(bootstrapServer); +if (port == null) { +throw new ConfigException( +String.format( +"Failed to parse host port from {} for the configuration {}. Each " + +"entry should be in the form \"{host}:{port}\"", +bootstrapServer, +QUORUM_BOOTSTRAP_SERVERS_CONFIG +) +); +} + +return InetSocketAddress.createUnresolved(host, port); Review Comment: I think it's a mistake to resolve hostnames as part of configuration validation. Just treat them as strings. Otherwise you could end up not being able to start Kafka because 1 out of 3 hostnames in your bootstrap list could not be resolved (because of a DNS problem or whatever) Resolve a hostname when you're actually using it, not before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
junrao commented on PR #15673: URL: https://github.com/apache/kafka/pull/15673#issuecomment-2138319133 @clolov Sorry to ping you again. The 3.8 branch is going to be cut by Friday. Will you still be able to complete this PR for 3.8.0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
cmccabe commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1619511561 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1635,24 +1693,29 @@ private Optional maybeHandleCommonResponse( } private void maybeTransition( -OptionalInt leaderId, +Optional leader, int epoch, long currentTimeMs ) { +OptionalInt leaderId = OptionalInt.empty(); Review Comment: Again would be more clearly expressed with `?:` rather than mutating the OptionalInt reference -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619511354 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -53,8 +42,17 @@ * The assignment builder prioritizes the properties in the following order: * Balance > Stickiness. */ -public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { -private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +public class OptimizedUniformAssignmentBuilder { Review Comment: I had the same question, we might need it in the rack aware case for all the common methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
cmccabe commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1619510911 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1593,6 +1644,13 @@ private Optional maybeHandleCommonResponse( int epoch, long currentTimeMs ) { +Optional leader = Optional.empty(); Review Comment: This would be more clearly expressed as something like ``` Optional leader = leaderId.isPresent() ? partitionState.lastVoterSet().voterNode(leaderId.getAsInt(), channel.listenerName()) : Optional.empty(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] : Code Cleanup - Misc modules [kafka]
chia7712 commented on code in PR #16067: URL: https://github.com/apache/kafka/pull/16067#discussion_r1619506272 ## core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java: ## @@ -105,12 +106,12 @@ public KafkaPrincipal deserialize(byte[] bytes) throws SerializationException { UpdateMetadataBroker broker = new UpdateMetadataBroker() .setId(0) .setRack("rack") -.setEndpoints(Arrays.asList( -new UpdateMetadataRequestData.UpdateMetadataEndpoint() -.setHost("broker0") -.setPort(9092) -.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) -.setListener(plaintextListener.value()) +.setEndpoints(Collections.singletonList( +new UpdateMetadataRequestData.UpdateMetadataEndpoint() Review Comment: Could you please revert those unrelated changes to reduce the size of PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
cmccabe commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1619507872 ## raft/src/main/java/org/apache/kafka/raft/FollowerState.java: ## @@ -156,22 +156,24 @@ public boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate) { log.debug( "Rejecting vote request from candidate ({}) since we already have a leader {} in epoch {}", candidateKey, -leaderId(), +leader, epoch ); return false; } @Override public String toString() { -return "FollowerState(" + -"fetchTimeoutMs=" + fetchTimeoutMs + -", epoch=" + epoch + -", leaderId=" + leaderId + -", voters=" + voters + -", highWatermark=" + highWatermark + -", fetchingSnapshot=" + fetchingSnapshot + -')'; +return String.format( Review Comment: I'm not sure I see a lot of benefit to this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
cmccabe commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1619506788 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -21,12 +21,12 @@ import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.{NewPartitions, NewTopic} import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} -import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition} Review Comment: Do this in a separate cleanup PR. ## core/src/test/resources/log4j.properties: ## @@ -21,6 +21,5 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.kafka=WARN log4j.logger.org.apache.kafka=WARN - Review Comment: Do this in a separate cleanup PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
cmccabe commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1619506439 ## core/src/main/scala/kafka/server/SharedServer.scala: ## @@ -94,6 +95,7 @@ class SharedServer( val time: Time, private val _metrics: Metrics, val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]], + val bootstrapServers: JCollection[InetSocketAddress], Review Comment: I don't see why this needs to be a new argument. We already have KafkaConfig passed in as an argument, and this just comes from `QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers)`. Adding a new argument here creates a lot of churn, which is a lot of work for merges and such. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
OmniaGM commented on PR #15999: URL: https://github.com/apache/kafka/pull/15999#issuecomment-2138308937 I was hoping for it to land in 3.8.0 I'll raise a PR against 3.8 once we merge it into trunk. however It shouldn't be a blocker for 3.8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
chia7712 commented on PR #15830: URL: https://github.com/apache/kafka/pull/15830#issuecomment-2138307998 @pasharik could you please fix the conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850505#comment-17850505 ] Chia-Ping Tsai edited comment on KAFKA-15305 at 5/29/24 9:37 PM: - KAFKA-16639 needs this ticket to fix root cause. As this ticket described, `ConsumerNetworkThread` does not honor the close timeout. Even though we put a heartbeat request to leave group, `ConsumerNetworkThread` will move it to `NetworkClient` and then exit the waiting ... It seems to me the simple solution is to add a method "hasInFlightRequests" to "networkClientDelegate", and then we change the while condition [0] from "timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()" to "timer.notExpired() && networkClientDelegate.hasInFlightRequests()". {code:java} boolean hasInFlightRequests() { return client.hasInFlightRequests(); } {code} {code:java} do { networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); timer.update(); } while (timer.notExpired() && networkClientDelegate.hasInFlightRequests()); {code} [~kirktrue] WDYT? [0] https://github.com/apache/kafka/blob/cc269b0d438534ae8fef16b39354da1d78332a2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L301 was (Author: chia7712): KAFKA-16639 needs this ticket to fix root cause. As this ticket described, `ConsumerNetworkThread` does not honor the close timeout. Even though we put a heartbeat request to leave group, `ConsumerNetworkThread` will move it to `NetworkClient` and then exit the waiting ... It seems to me the simple solution is to add a method "hasInFlightRequests" to "networkClientDelegate", and then we change the while condition from "timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()" to "timer.notExpired() && networkClientDelegate.hasInFlightRequests()". {code:java} boolean hasInFlightRequests() { return client.hasInFlightRequests(); } {code} {code:java} do { networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); timer.update(); } while (timer.notExpired() && networkClientDelegate.hasInFlightRequests()); {code} [~kirktrue] WDYT? > The background thread should try to process the remaining task until the > shutdown timer is expired > -- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. >  > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850506#comment-17850506 ] Chia-Ping Tsai commented on KAFKA-16639: KAFKA-15305 is the necessary cure for this issue. Otherwise, the heartbeat request may not be sent in closing. In this ticket we focus on fixing the "uncreated heartbeat request" > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > Â -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850505#comment-17850505 ] Chia-Ping Tsai commented on KAFKA-15305: KAFKA-16639 needs this ticket to fix root cause. As this ticket described, `ConsumerNetworkThread` does not honor the close timeout. Even though we put a heartbeat request to leave group, `ConsumerNetworkThread` will move it to `NetworkClient` and then exit the waiting ... It seems to me the simple solution is to add a method "hasInFlightRequests" to "networkClientDelegate", and then we change the while condition from "timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()" to "timer.notExpired() && networkClientDelegate.hasInFlightRequests()". {code:java} boolean hasInFlightRequests() { return client.hasInFlightRequests(); } {code} {code:java} do { networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); timer.update(); } while (timer.notExpired() && networkClientDelegate.hasInFlightRequests()); {code} [~kirktrue] WDYT? > The background thread should try to process the remaining task until the > shutdown timer is expired > -- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. >  > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16529) Response handling and request sending for voters RPCs
[ https://issues.apache.org/jira/browse/KAFKA-16529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JosĂ© Armando GarcĂa Sancio updated KAFKA-16529: --- Description: Implement response handling and request sending for the following RPCs: # Vote # BeginQuorumEpoch # EndQuorumEpoch # Fetch # FetchSnapshot When loading the leader from the quorum-state file don't assume that the leader is in the voter set. was: Implement response handling and request sending for the following RPCs: # Vote # BeginQuorumEpoch # Fetch When loading the leader from the quorum-state file don't assume that the leader is in the voter set. > Response handling and request sending for voters RPCs > - > > Key: KAFKA-16529 > URL: https://issues.apache.org/jira/browse/KAFKA-16529 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: JosĂ© Armando GarcĂa Sancio >Assignee: JosĂ© Armando GarcĂa Sancio >Priority: Major > Fix For: 3.8.0 > > > Implement response handling and request sending for the following RPCs: > # Vote > # BeginQuorumEpoch > # EndQuorumEpoch > # Fetch > # FetchSnapshot > When loading the leader from the quorum-state file don't assume that the > leader is in the voter set. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1619478117 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java: ## @@ -26,9 +26,9 @@ */ public interface GroupSpec { /** - * @return Member metadata keyed by member Id. + * @return Member subscription metadata keyed by member Id. */ -Map members(); +Map memberSubscriptions(); Review Comment: discussed offline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
chia7712 commented on PR #15999: URL: https://github.com/apache/kafka/pull/15999#issuecomment-2138240570 just remind that today is the deadline for feature freeze. If this PR is necessary for 3.8.0, we need to cherry-pick to branch-3.8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 14) Callback to TaskAssignor::onAssignmenComputed [kafka]
apourchet commented on code in PR #16123: URL: https://github.com/apache/kafka/pull/16123#discussion_r1619452846 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -768,15 +775,21 @@ private void assignTasksToClients(final Cluster fullMetadata, final Optional userTaskAssignor = userTaskAssignorSupplier.get(); +UserTaskAssignmentListener userTaskAssignmentListener = (GroupAssignment assignment, GroupSubscription subscription) -> { }; Review Comment: You can't change it once you make it final. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]
loicgreffier commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1619416340 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java: ## @@ -288,7 +301,31 @@ private void forwardInternal(final ProcessorNode child, final Record record) { setCurrentNode(child); -child.process(record); +try { Review Comment: @cadonna Catching exceptions right here: https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L847 was our very first intention before we decided to move it to `ProcessorContextImpl#forwardInternal`. Unless we're wrong, we cannot get the precise node name where the exception occurred at `StreamTask#doProcess` level. Could we: - catch exceptions at `StreamTask#doProcess` level - catch exceptions at `ProcessorContextImpl#forwardInternal` level, but rather than rethrowing a `StreamsException` (https://github.com/loicgreffier/kafka/blob/960c2a3153b30e48963387b5756b0310275bf48b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L320), it can throw a `new ChildNodeProcessingException(child.name, e)` caught and handled at StreamTask#doProcess level. `ChildNodeProcessingException` would be a new exception acting as a wrapper of the root cause, that additionally provides the name of the child node in which an exception occurs... letting the first node know about something happened in one of its children. Hope it is clear, if it sounds like a plan to you, we can give a try to this impl _EDIT_: We might have to call `processingExceptionHandler#handle` in `ProcessorContextImpl#forwardInternal` to know if processing should FAIL or CONTINUE at this level. Otherwise, a `ChildNodeProcessingException` will be thrown in all cases, FAIL or CONTINUE will be computed in `StreamTask#doProcess`, but in case of CONTINUE, how to resume the processing (maybe possible if ChildNodeProcessingException provides the failed child node name, and the current state of the record) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add more unit tests to LogSegments [kafka]
chia7712 commented on PR #16085: URL: https://github.com/apache/kafka/pull/16085#issuecomment-2138227399 @brandboat Could you please check the build error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
junrao commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1619445831 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig { "less than or equal to `log.retention.bytes` value."; public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.copy.max.bytes.per.second"; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be copied from local storage to remote storage per second. " + +"This is a global limit for all the partitions that are being copied from remote storage to local storage. " + +"The default value is Long.MAX_VALUE, which means there is no limit on the number of bytes that can be copied per second."; +public static final Long DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE; + +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP = "remote.log.manager.copy.quota.window.num"; +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC = "The number of samples to retain in memory for remote copy quota management. " + +"The default value is 61, which means there are 60 whole windows + 1 current window."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM = 61; + +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP = "remote.log.manager.copy.quota.window.size.seconds"; +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for remote copy quota management. " + +"The default value is 1 second."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS = 1; + +public static final String REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.fetch.max.bytes.per.second"; +public static final String REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be fetched from remote storage to local storage per second. " + +"This is a global limit for all the partitions that are being fetched from remote storage to local storage. " + +"The default value is Long.MAX_VALUE, which means there is no limit on the number of bytes that can be fetched per second."; +public static final Long DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND = Long.MAX_VALUE; + +public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP = "remote.log.manager.fetch.quota.window.num"; +public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = "The number of samples to retain in memory for remote fetch quota management. " + +"The default value is 11, which means there are 10 whole windows + 1 current window."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM = 11; Review Comment: If there is no good reach, perhaps it's better to use the same default window number for copy too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jeffkbkim commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619407866 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -53,8 +42,17 @@ * The assignment builder prioritizes the properties in the following order: * Balance > Stickiness. */ -public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { -private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +public class OptimizedUniformAssignmentBuilder { Review Comment: are we going to remove AbstractUniformAssignmentBuilder? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java: ## @@ -158,12 +159,11 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( -mkTopicAssignment(topic1Uuid, 0, 2), -mkTopicAssignment(topic3Uuid, 1) +mkTopicAssignment(topic1Uuid, 0), +mkTopicAssignment(topic3Uuid, 0, 1) )); expectedAssignment.put(memberB, mkAssignment( -mkTopicAssignment(topic1Uuid, 1), -mkTopicAssignment(topic3Uuid, 0) +mkTopicAssignment(topic1Uuid, 1, 2) Review Comment: I see some test cases where we have changed the expected assignment. why did it change? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int re
Re: [PR] KAFKA-15045: (KIP-924 pt. 14) Callback to TaskAssignor::onAssignmenComputed [kafka]
ableegoldman merged PR #16123: URL: https://github.com/apache/kafka/pull/16123 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 14) Callback to TaskAssignor::onAssignmenComputed [kafka]
ableegoldman commented on PR #16123: URL: https://github.com/apache/kafka/pull/16123#issuecomment-2138189584 Test failures are unrelated, merging to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix rate metric spikes [kafka]
junrao merged PR #15889: URL: https://github.com/apache/kafka/pull/15889 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 14) Callback to TaskAssignor::onAssignmenComputed [kafka]
ableegoldman commented on code in PR #16123: URL: https://github.com/apache/kafka/pull/16123#discussion_r1619409187 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -720,7 +727,7 @@ private void checkAllPartitions(final Set allSourceTopics, * Assigns a set of tasks to each client (Streams instance) using the configured task assignor, and also * populate the stateful tasks that have been assigned to the clients */ -private void assignTasksToClients(final Cluster fullMetadata, +private UserTaskAssignmentListener assignTasksToClients(final Cluster fullMetadata, Review Comment: nit: fix formatting (indentation of the lower parameters) Obviously you can address this in PR 9, no need to rerun the build on this PR ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -768,15 +775,21 @@ private void assignTasksToClients(final Cluster fullMetadata, final Optional userTaskAssignor = userTaskAssignorSupplier.get(); +UserTaskAssignmentListener userTaskAssignmentListener = (GroupAssignment assignment, GroupSubscription subscription) -> { }; if (userTaskAssignor.isPresent()) { final ApplicationState applicationState = buildApplicationState( taskManager.topologyMetadata(), clientMetadataMap, topicGroups, fullMetadata ); -final TaskAssignment taskAssignment = userTaskAssignor.get().assign(applicationState); +final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor = userTaskAssignor.get(); +final TaskAssignment taskAssignment = assignor.assign(applicationState); processStreamsPartitionAssignment(clientMetadataMap, taskAssignment); +final AssignmentError assignmentError = validateTaskAssignment(applicationState, taskAssignment); +userTaskAssignmentListener = (GroupAssignment assignment, GroupSubscription subscription) -> { +assignor.onAssignmentComputed(assignment, subscription, assignmentError); +}; Review Comment: nit: Streams coding style tip, use simplified lambdas, ie ```suggestion userTaskAssignmentListener = (assignment, subscription) -> assignor.onAssignmentComputed(assignment, subscription, assignmentError); ``` (I'm actually surprised checkstyle isn't complaining about this. Your IDE is probably at least telling you to simplify it I would guess) ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -768,15 +775,21 @@ private void assignTasksToClients(final Cluster fullMetadata, final Optional userTaskAssignor = userTaskAssignorSupplier.get(); +UserTaskAssignmentListener userTaskAssignmentListener = (GroupAssignment assignment, GroupSubscription subscription) -> { }; Review Comment: nit (address in PR 9): Streams coding style tip, prefer final variables whenever possible. So this would be like: ``` final UserTaskAssignmentListener listener; if (userTaskAssignor.isPresent() { // do stuff listener = ...; else { // do other stuff listener = (a, s) -> { }; } ``` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultApplicationState.java: ## @@ -32,22 +33,29 @@ import org.apache.kafka.streams.processor.assignment.ProcessId; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; -public class ApplicationStateImpl implements ApplicationState { +public class DefaultApplicationState implements ApplicationState { private final AssignmentConfigs assignmentConfigs; -private final Set tasks; +private final Map tasks; private final Map clientStates; -public ApplicationStateImpl(final AssignmentConfigs assignmentConfigs, -final Set tasks, -final Map clientStates) { +private final Map> cachedKafkaStreamStates; + +public DefaultApplicationState(final AssignmentConfigs assignmentConfigs, + final Set tasks, + final Map clientStates) { this.assignmentConfigs = assignmentConfigs; -this.tasks = unmodifiableSet(tasks); +this.tasks = unmodifiableMap(tasks.stream().collect(Collectors.toMap(TaskInfo::id, task -> task))); Review Comment: We can avoid adding yet another `.stream().collect() ` by building up the map with the TaskIds as keys when we're building the `Set tasks` parameter in the caller of this constructor. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultApplicationS
Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]
loicgreffier commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1619416340 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java: ## @@ -288,7 +301,31 @@ private void forwardInternal(final ProcessorNode child, final Record record) { setCurrentNode(child); -child.process(record); +try { Review Comment: @cadonna Catching exceptions right here: https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L847 was our very first intention before we decided to move it to `ProcessorContextImpl#forwardInternal`. Unless we're wrong, we cannot get the precise node name where the exception occurred at `StreamTask#doProcess` level. Could we: - catch exceptions at `StreamTask#doProcess` level - catch exceptions at `ProcessorContextImpl#forwardInternal` level, but rather than rethrowing a `StreamsException` (https://github.com/loicgreffier/kafka/blob/960c2a3153b30e48963387b5756b0310275bf48b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L320), it can throw a `new ChildNodeProcessingException(child.name, e)` caught and handled at StreamTask#doProcess level. `ChildNodeProcessingException` would be a new exception acting as a wrapper of the root cause, that additionally provides the name of the child node in which an exception occurs... letting the first node know about something happened in one of its children. Hope it is clear, if it sounds like a plan to you, we can give a try to this impl -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]
loicgreffier commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1619416340 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java: ## @@ -288,7 +301,31 @@ private void forwardInternal(final ProcessorNode child, final Record record) { setCurrentNode(child); -child.process(record); +try { Review Comment: @cadonna Catching exceptions right here: https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L847 was our very first intention before we decided to move it to `ProcessorContextImpl#forwardInternal`. Unless we're wrong, we cannot get the precise node name where the exception occured at `StreamTask#doProcess` level. Could we: - catch exceptions at `StreamTask#doProcess` level - catch exceptions at `ProcessorContextImpl#forwardInternal` level, but rather than rethrowing a `StreamsException` (https://github.com/loicgreffier/kafka/blob/960c2a3153b30e48963387b5756b0310275bf48b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L320), it can throw a `new ChildNodeProcessingException(child.name, e)` caught and handled at StreamTask#doProcess level. `ChildNodeProcessingException` would be a new exception acting as a wrapper of the root cause, that additionally provides the name of the child node in which an exception occurs... letting the first node know about something happened in one of its children. Hope it is clear, if it sounds like a plan to you, we can give a try to this impl -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16105: Reassignment fix [kafka]
AnatolyPopov commented on code in PR #15165: URL: https://github.com/apache/kafka/pull/15165#discussion_r1619366626 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ## @@ -254,6 +259,61 @@ public void testCanProcessRecord() throws InterruptedException { assertEquals(3, handler.metadataCounter); } +@Test +public void testCanReprocessSkippedRecords() throws InterruptedException { +final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg"); +final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); +final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1)); +final TopicIdPartition tpId3 = new TopicIdPartition(topicId, new TopicPartition("sample", 3)); +assertEquals(partitioner.metadataPartition(tpId0), partitioner.metadataPartition(tpId1)); +assertNotEquals(partitioner.metadataPartition(tpId3), partitioner.metadataPartition(tpId0)); + +final int metadataPartition = partitioner.metadataPartition(tpId0); +final int anotherMetadataPartition = partitioner.metadataPartition(tpId3); + +// Mocking the consumer to be able to wait for the second reassignment +doAnswer(invocation -> { Review Comment: Totally agree about the mock-of-a-mock approach, I don't like it either. But hopefully it's good enough for now and it helps to focus on changes related specifically to this bug in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix rate metric spikes [kafka]
emitskevich-blp commented on PR #15889: URL: https://github.com/apache/kafka/pull/15889#issuecomment-2138093800 > Are the failed tests related to this PR, especially the following one? Not related, they are all flaky tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619346921 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: ``` val defaultValue = defaultVersionString match { case Some(versionString) => MetadataVersion.fromVersionString(versionString) case None => MetadataVersion.LATEST_PRODUCTION } val releaseVersionTag = Option(namespace.getString("release_version")) val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME) (releaseVersionTag, featureTag) match { case (Some(_), Some(_)) => // We should throw an error before we hit this case, but include for completeness throw new IllegalArgumentException("Both --release_version and --feature were set. Only one of the two flags can be set.") case (Some(version), None) => MetadataVersion.fromVersionString(version) case (None, Some(level)) => MetadataVersion.fromFeatureLevel(level) case (None, None) => defaultValue } ``` ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != nu
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619344585 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: We only use the passed in metadata version for defaults if --release-version is specified. If version default is specified, we don't use the replication configs. ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619344585 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: We only use the passed in metadata version for defaults if --version-default is specified. If version default is specified, we don't use the replication configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: make public the consumer group migration policy config [kafka]
dongnuo123 opened a new pull request, #16128: URL: https://github.com/apache/kafka/pull/16128 This patch expose the group coordinator config `CONSUMER_GROUP_MIGRATION_POLICY_CONFIG`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
junrao commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619332926 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: Thanks for the reply. Got it now. We pass in INTER_BROKER_PROTOCOL_VERSION_CONFIG as the default when calling `getMetadataVersion`. But that config shouldn't impact the MV used for selecting other features. ``` val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]
loicgreffier commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1619321278 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ## @@ -764,6 +766,7 @@ public boolean process(final long wallClockTime) { // get the next record to process record = partitionGroup.nextRecord(recordInfo, wallClockTime); +rawRecord = partitionGroup.rawHeadRecord(); Review Comment: We did not really consider it as it appears more impactful for us to add a new attribute to `ProcessorRecordContext` (more code/unit tests changes basically) but we can definitely reconsider it. Also we'll need to update the production exception handling (https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L223) accordingly to KIP-1033. Having the raw record in the `ProcessorRecordContext` would help us on this part. Let us try that! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]
loicgreffier commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1619321278 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ## @@ -764,6 +766,7 @@ public boolean process(final long wallClockTime) { // get the next record to process record = partitionGroup.nextRecord(recordInfo, wallClockTime); +rawRecord = partitionGroup.rawHeadRecord(); Review Comment: We did not really consider it as it appears more impactful for us to add a new attribute to `ProcessorRecordContext` (more code/unit tests changes basically) but we can definitely reconsider it. Also we'll need to update the production exception handling (https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L223) accordingly to KIP-1033. Having the raw record in the `ProcessorRecordContext` would help us on this part. Let us try that! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org