[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r610185971 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -2393,6 +2407,10 @@ class Log(@volatile private var _dir: File, } // okay we are safe now, remove the swap suffix sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "")) + + // If not recovered swap file we need to increment logStartOffset here. Otherwise, we do this when loading the log. + if (!isRecoveredSwapFile) Review comment: Or rather, my test remembers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12636) Ensure retention still enforced for compacted topics if cleaning is not enabled
Jason Gustafson created KAFKA-12636: --- Summary: Ensure retention still enforced for compacted topics if cleaning is not enabled Key: KAFKA-12636 URL: https://issues.apache.org/jira/browse/KAFKA-12636 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson We rely on a periodic task in LogManager to delete old segments of non-compacted topics which either have breached retention time or which have been explicitly deleted by a call to DeleteRecords. For compacted topics, we rely on the cleaning task itself to do the same since a compacted topic may also be configured with "delete" retention. If log cleaning is not enabled, we still need to enforce retention semantics for compacted topics, but the current logic in LogManager excludes them from consideration: {code} // clean current logs. val deletableLogs = { if (cleaner != null) { // prevent cleaner from working on same partitions when changing cleanup policy cleaner.pauseCleaningForNonCompactedPartitions() } else { currentLogs.filter { case (_, log) => !log.config.compact } } } {code} It seems to me that we should remove the filtering when log cleaning is not enabled. The logic in `deleteOldSegments` will ensure that only the appropriate retention checks are made based on the topic configuration. Of course it's kind of weird for a user to have a compacted topic when the cleaner is not enabled in the first place. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12502) Quorum controller should return topic configs in CreateTopic response
[ https://issues.apache.org/jira/browse/KAFKA-12502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn reassigned KAFKA-12502: - Assignee: Ryan Dielhenn > Quorum controller should return topic configs in CreateTopic response > - > > Key: KAFKA-12502 > URL: https://issues.apache.org/jira/browse/KAFKA-12502 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Ryan Dielhenn >Priority: Major > Labels: kip-500 > > Configs were added to the response in version 5. > {code} > { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": > "5+", "nullableVersions": "5+", "ignorable": true, > "about": "Configuration of the topic.", "fields": [ > { "name": "Name", "type": "string", "versions": "5+", > "about": "The configuration name." }, > { "name": "Value", "type": "string", "versions": "5+", > "nullableVersions": "5+", > "about": "The configuration value." }, > { "name": "ReadOnly", "type": "bool", "versions": "5+", > "about": "True if the configuration is read-only." }, > { "name": "ConfigSource", "type": "int8", "versions": "5+", > "default": "-1", "ignorable": true, > "about": "The configuration source." }, > { "name": "IsSensitive", "type": "bool", "versions": "5+", > "about": "True if this configuration is sensitive." } > ]} > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
guozhangwang commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r610209134 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -38,20 +48,36 @@ private final String otherWindowName; private final long joinBeforeMs; private final long joinAfterMs; +private final long joinGraceMs; private final ValueJoinerWithKey joiner; private final boolean outer; +private final Optional outerJoinWindowName; +private final boolean thisJoin; + +// Observed time is AtomicLong because this time is shared between the left and side processor nodes. However, +// this time is not updated in parallel, so we can call get() several times without worry about getting different +// times. +private final AtomicLong maxObservedStreamTime; Review comment: I personally was on the side of always using task stream time everywhere but more people feel that we should use processor stream time :P Anyways, all I'm trying to say is that we need to make an educated decision here, and if we concluded that either 1) we rely on task time here, but still use processor time on other expiration logic, or 2) we rely on processor time on all logic, or 3) we rely on task time on all logic, we have a good rationale for whichever we choose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request #10508: KAFKA-12633: Remove deprecated APIs in TopologyTestDriver
guozhangwang opened a new pull request #10508: URL: https://github.com/apache/kafka/pull/10508 As well as related test classes. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10508: KAFKA-12633: Remove deprecated APIs in TopologyTestDriver
guozhangwang commented on pull request #10508: URL: https://github.com/apache/kafka/pull/10508#issuecomment-816328916 ping @vvcephei for reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r610235224 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } +public void addControlBatch(MemoryRecords records) { +appendLock.lock(); +try { +drainStatus = DrainStatus.STARTED; +completed.add(new CompletedBatch<>( +nextOffset, +null, Review comment: Why don't we use `Optional.empty`? ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } +public void addControlBatch(MemoryRecords records) { +appendLock.lock(); +try { +drainStatus = DrainStatus.STARTED; Review comment: It would be useful to factor out a `flush()` API. We may have additional use cases in the future. ```java public void flush() { appendLock.lock(); try { drainStatus = DrainStatus.STARTED; maybeCompleteDrain(); } finally { appendLock.unlock(); } } ``` ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1876,12 +1836,12 @@ private void appendBatch( } private long maybeAppendBatches( -LeaderState state, +LeaderState state, long currentTimeMs ) { -long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs); +long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs); Review comment: While we're here, can we fix the name? It should be `timeUntilFlush`. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -,9 +2182,12 @@ public Long scheduleAtomicAppend(int epoch, List records) { return append(epoch, records, true); } +@SuppressWarnings("unchecked") Review comment: Seems this is not needed? ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } +public void addControlBatch(MemoryRecords records) { Review comment: Can we change this to `appendLeaderChangeMessage(LeaderChangeMessage)`? This would provide a stronger contract since it ensures that the batch is indeed a control batch, that its base offset is set consistently with `nextOffset`, and that it contains only one record as expected. It also would allow us to allocate the buffer used for the control batch from the `MemoryPool`. Then we wouldn't need to use the `null` values below when constructing the `CompletedBatch`, which avoids NPE potential. ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } +public void addControlBatch(MemoryRecords records) { +appendLock.lock(); +try { +drainStatus = DrainStatus.STARTED; +completed.add(new CompletedBatch<>( Review comment: I think we are assuming here that `currentBatch` is null. Although that is guaranteed to be the case for this specific usage in `KafkaRaftClient`, we should try to give `BatchAccumulator` a stronger contract. To fix this, we can just call `maybeCompleteDrain` before doing anything else. By the way, we should also have unit tests for these scenarios. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1859,15 +1819,15 @@ private void appendBatch( offsetAndEpoch.offset + 1, Integer.MAX_VALUE); future.whenComplete((commitTimeMs, exception) -> { -int numRecords = batch.records.size(); +int numRecords = batch.records.get().size(); Review comment: We should try to avoid blind calls to `get()`. I'm honestly not too sure how this even works for the case of a control batch. Maybe we are just ignoring the error? ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -67,6 +75,39 @@ protected LeaderState( } this.grantingVoters.addAll(grantingVoters); this.log = logContext.logger(LeaderState.class); +this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +} + +public BatchAccumulator accumulator() { +return this.accumulator; +} + +private static List convertToVoters(Set voterIds) { +return voterIds.stream() +.map(follower -> new Voter().setVot
[GitHub] [kafka] hachikuji commented on a change in pull request #10343: KAFKA-12471: Implement createPartitions in KIP-500 mode
hachikuji commented on a change in pull request #10343: URL: https://github.com/apache/kafka/pull/10343#discussion_r610263157 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1007,6 +999,128 @@ int bestLeader(int[] replicas, int[] isr, boolean unclean) { return ControllerResult.of(records, null); } +ControllerResult> +createPartitions(List topics) { +List records = new ArrayList<>(); +List results = new ArrayList<>(); +for (CreatePartitionsTopic topic : topics) { +ApiError apiError = ApiError.NONE; +try { +createPartitions(topic, records); +} catch (ApiException e) { +apiError = ApiError.fromThrowable(e); +} catch (Exception e) { +log.error("Unexpected createPartitions error for {}", topic, e); +apiError = ApiError.fromThrowable(e); +} +results.add(new CreatePartitionsTopicResult(). +setName(topic.name()). +setErrorCode(apiError.error().code()). +setErrorMessage(apiError.message())); +} +return new ControllerResult<>(records, results, true); +} + +void createPartitions(CreatePartitionsTopic topic, + List records) { +Uuid topicId = topicsByName.get(topic.name()); +if (topicId == null) { +throw new UnknownTopicOrPartitionException(); +} +TopicControlInfo topicInfo = topics.get(topicId); +if (topicInfo == null) { +throw new UnknownTopicOrPartitionException(); +} +if (topic.count() == topicInfo.parts.size()) { Review comment: I guess this logic is consistent with the current implementation. It might have been nice to make this an idempotent operation. ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -538,4 +542,65 @@ class ControllerApis(val requestChannel: RequestChannel, } }) } + + def handleCreatePartitions(request: RequestChannel.Request): Unit = { +val future = createPartitions(request.body[CreatePartitionsRequest].data, + authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(n => n)) +future.whenComplete((responses, exception) => { + if (exception != null) { +requestHelper.handleError(request, exception) + } else { +requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + val responseData = new CreatePartitionsResponseData(). +setResults(responses). +setThrottleTimeMs(requestThrottleMs) + new CreatePartitionsResponse(responseData) +}) + } +}) + } + + def createPartitions(request: CreatePartitionsRequestData, + hasClusterAuth: Boolean, + getCreatableTopics: Iterable[String] => Set[String]) + : CompletableFuture[util.List[CreatePartitionsTopicResult]] = { +val responses = new util.ArrayList[CreatePartitionsTopicResult]() +val duplicateTopicNames = new util.HashSet[String]() +val topicNames = new util.HashSet[String]() +request.topics().forEach { + topic => +if (!topicNames.add(topic.name())) { + duplicateTopicNames.add(topic.name()) +} +} +duplicateTopicNames.forEach { topicName => + responses.add(new CreatePartitionsTopicResult(). +setName(topicName). +setErrorCode(INVALID_REQUEST.code()). +setErrorMessage("Duplicate topic name.")) +topicNames.remove(topicName) +} +val authorizedTopicNames = { + if (hasClusterAuth) { +topicNames.asScala + } else { +getCreatableTopics(topicNames.asScala) + } +} +val topics = new util.ArrayList[CreatePartitionsTopic] +topicNames.forEach { Review comment: nit: `topicNames.forEach { topicName =>` ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1007,6 +999,128 @@ int bestLeader(int[] replicas, int[] isr, boolean unclean) { return ControllerResult.of(records, null); } +ControllerResult> +createPartitions(List topics) { +List records = new ArrayList<>(); +List results = new ArrayList<>(); +for (CreatePartitionsTopic topic : topics) { +ApiError apiError = ApiError.NONE; +try { +createPartitions(topic, records); +} catch (ApiException e) { +apiError = ApiError.fromThrowable(e); +} catch (Exception e) { +log.error("Unexpected createPartitions error for {}", topic, e); +apiError = ApiError.fromThrowable(e
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r610269446 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } +public void addControlBatch(MemoryRecords records) { +appendLock.lock(); +try { +drainStatus = DrainStatus.STARTED; +completed.add(new CompletedBatch<>( +nextOffset, +null, Review comment: In constructor I used `Optional.ofNullable(records);`. I will change it so it reads easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r610269446 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } +public void addControlBatch(MemoryRecords records) { +appendLock.lock(); +try { +drainStatus = DrainStatus.STARTED; +completed.add(new CompletedBatch<>( +nextOffset, +null, Review comment: In constructor I used `Optional.ofNullable(records);`. I will change it so it reads easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r610280789 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } +public void addControlBatch(MemoryRecords records) { +appendLock.lock(); +try { +drainStatus = DrainStatus.STARTED; +completed.add(new CompletedBatch<>( +nextOffset, +null, Review comment: If `Optional.empty` is used, the type argument can not be inferred for CompletedBatch. I got around this by using `Optional.ofNullable` in the constructor of CompletedBatch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r610281681 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } +public void addControlBatch(MemoryRecords records) { +appendLock.lock(); +try { +drainStatus = DrainStatus.STARTED; +completed.add(new CompletedBatch<>( +nextOffset, +null, Review comment: Using `Optional.ofNullable` saved the time of having to change every other time a CompletedBatch is constructed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r610281681 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } +public void addControlBatch(MemoryRecords records) { +appendLock.lock(); +try { +drainStatus = DrainStatus.STARTED; +completed.add(new CompletedBatch<>( +nextOffset, +null, Review comment: Using `Optional.ofNullable` saved the time of having to change every other instantiation of CompletedBatch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r610281681 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } +public void addControlBatch(MemoryRecords records) { +appendLock.lock(); +try { +drainStatus = DrainStatus.STARTED; +completed.add(new CompletedBatch<>( +nextOffset, +null, Review comment: Using `Optional.ofNullable` in the constructor saved the time of having to change every other instantiation of CompletedBatch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12464) Enhance constrained sticky Assign algorithm
[ https://issues.apache.org/jira/browse/KAFKA-12464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-12464: -- Description: In KAFKA-9987, we did a great improvement for the case when all consumers were subscribed to same set of topics. The algorithm contains 4 phases: # Reassign as many previously owned partitions as possible, up to the maxQuota # Fill remaining members up to minQuota # If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions from the over-full consumers at max capacity # Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we should just distribute one partition each to all consumers at min capacity Take an example for better understanding: *example:* Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, ... t1p9 Suppose, current assignment is: _C0: t1p0, t1p1, t1p2, t1p3, t1p4_ _C1: t1p5, t1p6, t1p7, t1p8, t1p9_ Now, new consumer added: C2, so we'll do: # Reassign as many previously owned partitions as possible, up to the maxQuota After this phase, the assignment will be: (maxQuota will be 4) _C0: t1p0, t1p1, t1p2, t1p3_ _C1: t1p5, t1p6, t1p7, t1p8_ # Fill remaining members up to minQuota After this phase, the assignment will be: _C0: t1p0, t1p1, t1p2, t1p3_ _C1: t1p5, t1p6, t1p7, t1p8_ _C2: t1p4, t1p9_ # If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions from the over-full consumers at max capacity After this phase, the assignment will be: _C0: t1p0, t1p1, t1p2_ _C1: t1p5, t1p6, t1p7, t1p8_ _C2: t1p4, t1p9,_ _t1p3_ # Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we should just distribute one partition each to all consumers at min capacity As we can see, we need 3 phases to complete the assignment. But we can actually completed with 2 phases. Here's the updated algorithm: # Reassign as many previously owned partitions as possible, up to the maxQuota, and also considering the numMaxQuota by the remainder of (Partitions / Consumers) # Fill remaining members up to maxQuota if current maxQuotaMember < numMaxQuota, otherwise, to minQuota By considering the numMaxQuota, the original step 1 won't be too aggressive to assign too many partitions to consumers, and the step 2 won't be too conservative to assign not enough partitions to consumers, so that we don't need step 3 and step 4 to balance them. {{So, the updated Pseudo-code sketch of the algorithm:}} C_f := (P/N)_floor, the floor capacity C_c := (P/N)_ceil, the ceiling capacity *C_r := (P%N) the allowed number of members with C_c partitions assigned* *num_max_capacity_members := current number of members with C_c partitions assigned (default to 0)* members := the sorted set of all consumers partitions := the set of all partitions unassigned_partitions := the set of partitions not yet assigned, initialized to be all partitions unfilled_members := the set of consumers not yet at capacity, initialized to empty -max_capacity_members := the set of members with exactly C_c partitions assigned, initialized to empty- member.owned_partitions := the set of previously owned partitions encoded in the Subscription // Reassign as many previously owned partitions as possible, *by considering the num_max_capacity_members* for member : members remove any partitions that are no longer in the subscription from its owned partitions remove all owned_partitions if the generation is old if member.owned_partitions.size < C_f assign all owned partitions to member and remove from unassigned_partitions add member to unfilled_members -else if member.owned_partitions.size == C_f- -assign first C_f owned_partitions to member and remove from unassigned_partitions- else if member.owned_partitions.size >= C_c *&& num_max_capacity_members < C_r* *assign first C_c owned_partitions to member and remove from unassigned_partitions* *num_max_capacity_members++* a-dd member to max_capacity_members- *else* *assign first C_f owned_partitions to member and remove from unassigned_partitions* sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, t1_p0 (for data parallelism) sort unfilled_members by memberId (for determinism) // Fill remaining members *up to the C_r numbers of C_c, otherwise, to C_f* for member : unfilled_members compute the remaining capacity as -C = C_f - num_assigned_partitions- if num_max_capacity_members < C_r: C = C_c - num_assigned_partitions num_max_capacity_members++ else C = C_f - num_assigned_partitions pop the first C partitions from unassig
[jira] [Created] (KAFKA-12637) Remove deprecated PartitionAssignor interface
A. Sophie Blee-Goldman created KAFKA-12637: -- Summary: Remove deprecated PartitionAssignor interface Key: KAFKA-12637 URL: https://issues.apache.org/jira/browse/KAFKA-12637 Project: Kafka Issue Type: Improvement Components: consumer Reporter: A. Sophie Blee-Goldman Fix For: 3.0.0 In KIP-429, we deprecated the existing PartitionAssignor interface in order to move it out of the internals package and better align the name with other pluggable Consumer interfaces. We added an adapter to convert from existing o.a.k.clients.consumer.internals.PartitionAssignor to the new o.a.k.clients.consumer.ConsumerPartitionAssignor and support the deprecated interface. This was deprecated in 2.4, so we should be ok to remove it and the adaptor in 3.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12637) Remove deprecated PartitionAssignor interface
[ https://issues.apache.org/jira/browse/KAFKA-12637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12637: --- Labels: newbie newbie++ (was: ) > Remove deprecated PartitionAssignor interface > - > > Key: KAFKA-12637 > URL: https://issues.apache.org/jira/browse/KAFKA-12637 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Labels: newbie, newbie++ > Fix For: 3.0.0 > > > In KIP-429, we deprecated the existing PartitionAssignor interface in order > to move it out of the internals package and better align the name with other > pluggable Consumer interfaces. We added an adapter to convert from existing > o.a.k.clients.consumer.internals.PartitionAssignor to the new > o.a.k.clients.consumer.ConsumerPartitionAssignor and support the deprecated > interface. This was deprecated in 2.4, so we should be ok to remove it and > the adaptor in 3.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12637) Remove deprecated PartitionAssignor interface
[ https://issues.apache.org/jira/browse/KAFKA-12637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12637: --- Description: In KIP-429, we deprecated the existing PartitionAssignor interface in order to move it out of the internals package and better align the name with other pluggable Consumer interfaces. We added an adapter to convert from existing o.a.k.clients.consumer.internals.PartitionAssignor to the new o.a.k.clients.consumer.ConsumerPartitionAssignor and support the deprecated interface. This was deprecated in 2.4, so we should be ok to remove it and the PartitionAssignorAdaptor in 3.0 (was: In KIP-429, we deprecated the existing PartitionAssignor interface in order to move it out of the internals package and better align the name with other pluggable Consumer interfaces. We added an adapter to convert from existing o.a.k.clients.consumer.internals.PartitionAssignor to the new o.a.k.clients.consumer.ConsumerPartitionAssignor and support the deprecated interface. This was deprecated in 2.4, so we should be ok to remove it and the adaptor in 3.0) > Remove deprecated PartitionAssignor interface > - > > Key: KAFKA-12637 > URL: https://issues.apache.org/jira/browse/KAFKA-12637 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Labels: newbie, newbie++ > Fix For: 3.0.0 > > > In KIP-429, we deprecated the existing PartitionAssignor interface in order > to move it out of the internals package and better align the name with other > pluggable Consumer interfaces. We added an adapter to convert from existing > o.a.k.clients.consumer.internals.PartitionAssignor to the new > o.a.k.clients.consumer.ConsumerPartitionAssignor and support the deprecated > interface. This was deprecated in 2.4, so we should be ok to remove it and > the PartitionAssignorAdaptor in 3.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kowshik commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
kowshik commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r610293058 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * This class represents the in-memory state of segments associated with a leader epoch. This includes the mapping of offset to + * segment ids and unreferenced segments which are not mapped to any offset but they exist in remote storage. + * + * This is used by {@link RemoteLogMetadataCache} to track the segments for each leader epoch. + */ +class RemoteLogLeaderEpochState { + +// It contains offset to segment ids mapping with the segment state as COPY_SEGMENT_FINISHED. +private final NavigableMap offsetToId = new ConcurrentSkipListMap<>(); + +/** + * It represents unreferenced segments for this leader epoch. It contains the segments still in COPY_SEGMENT_STARTED + * and DELETE_SEGMENT_STARTED state or these have been replaced by callers with other segments having the same + * start offset for the leader epoch. These will be returned by {@link RemoteLogMetadataCache#listAllRemoteLogSegments()} + * and {@link RemoteLogMetadataCache#listRemoteLogSegments(int leaderEpoch)} so that callers can clean them up if + * they still exist. These will be cleaned from the cache once they reach DELETE_SEGMENT_FINISHED state. + */ +private final Set unreferencedSegmentIds = ConcurrentHashMap.newKeySet(); + +// It represents the highest log offset of the segments that were updated with updateHighestLogOffset. +private volatile Long highestLogOffset; + +/** + * Returns all the segments associated with this leader epoch sorted by start offset in ascending order. + * + * @param idToSegmentMetadata mapping of id to segment metadata. This will be used to get RemoteLogSegmentMetadata + *for an id to be used for sorting. + */ +Iterator listAllRemoteLogSegments(Map idToSegmentMetadata) { +// Return all the segments including unreferenced metadata. +int size = offsetToId.size() + unreferencedSegmentIds.size(); +if (size == 0) { +return Collections.emptyIterator(); +} + +ArrayList metadataList = new ArrayList<>(size); +for (RemoteLogSegmentId id : offsetToId.values()) { +metadataList.add(idToSegmentMetadata.get(id)); Review comment: Hmm here we assume that `id` should be present in the provided `idToSegmentMetadata`. Due to programming error, or other reasons, the caller may not be able to ensure this. Would it be safer if we instead threw whenever `id` is absent in `idToSegmentMetadata` to catch that case? ## File path: clients/src/test/java/org/apache/kafka/test/TestUtils.java ## @@ -535,4 +536,46 @@ public static void setFieldValue(Object obj, String fieldName, Object value) thr field.setAccessible(true); field.set(obj, value); } + +/** + * Returns true if both iterators have same elements in the same order. + * + * @param iterator1 first iterator. + * @param iterator2 second iterator. + * @paramtype of element in the iterators. + */ +public static boolean sameElementsWithOrder(Iterator iterator1, Review comment: Here is a slightly simpler version: ``` while (iterator1.hasNext() && iterator2.hasNext()) { if (!Objects.equals(iterator1.next(), iterator2.next())) { return false; } } return !iterator1.hasNext
[GitHub] [kafka] kowshik commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
kowshik commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r610305248 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java ## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments + * with respect to leader epochs. + * + * Remote log segment can go through the state transitions as mentioned in {@link RemoteLogSegmentState}. + * + * This class will have all the segments which did not reach terminal state viz DELETE_SEGMENT_FINISHED. That means,any + * segment reaching the terminal state will get cleared from this instance. + * This class provides different methods to fetch segment metadata like {@link #remoteLogSegmentMetadata(int, long)}, + * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, {@link #listAllRemoteLogSegments()}. Those + * methods have different semantics to fetch the segment based on its state. + * + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}: + * + * Segment in this state indicates it is not yet copied successfully. So, these segments will not be + * accessible for reads but these are considered for cleanups when a partition is deleted. + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}: + * + * Segment in this state indicates it is successfully copied and it is available for reads. So, these segments + * will be accessible for reads. But this should be available for any cleanup activity like deleting segments by the + * caller of this class. + * + * + * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}: + * Segment in this state indicates it is getting deleted. That means, it is not available for reads. But it should be + * available for any cleanup activity like deleting segments by the caller of this class. + * + * + * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}: + * Segment in this state indicate it is already deleted. That means, it is not available for any activity including + * reads or cleanup activity. This cache will clear entries containing this state. + * + * + * + * + * The below table summarizes whether the segment with the respective state are available for the given methods. + * + * +-+--++-+-+ + * | Method / SegmentState | COPY_SEGMENT_STARTED | COPY_SEGMENT_FINISHED | DELETE_SEGMENT_STARTED | DELETE_SEGMENT_STARTED | + * |-+--++-+-| + * | remoteLogSegmentMetadata|No| Yes | No | No| + * | (int leaderEpoch, long offset) | | | | | + * |-+--++-+-| + * | listRemoteLogSegments |Yes | Yes | Yes| No| + * | (int leaderEpoch) |
[GitHub] [kafka] kowshik commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
kowshik commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r610305248 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java ## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments + * with respect to leader epochs. + * + * Remote log segment can go through the state transitions as mentioned in {@link RemoteLogSegmentState}. + * + * This class will have all the segments which did not reach terminal state viz DELETE_SEGMENT_FINISHED. That means,any + * segment reaching the terminal state will get cleared from this instance. + * This class provides different methods to fetch segment metadata like {@link #remoteLogSegmentMetadata(int, long)}, + * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, {@link #listAllRemoteLogSegments()}. Those + * methods have different semantics to fetch the segment based on its state. + * + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}: + * + * Segment in this state indicates it is not yet copied successfully. So, these segments will not be + * accessible for reads but these are considered for cleanups when a partition is deleted. + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}: + * + * Segment in this state indicates it is successfully copied and it is available for reads. So, these segments + * will be accessible for reads. But this should be available for any cleanup activity like deleting segments by the + * caller of this class. + * + * + * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}: + * Segment in this state indicates it is getting deleted. That means, it is not available for reads. But it should be + * available for any cleanup activity like deleting segments by the caller of this class. + * + * + * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}: + * Segment in this state indicate it is already deleted. That means, it is not available for any activity including + * reads or cleanup activity. This cache will clear entries containing this state. + * + * + * + * + * The below table summarizes whether the segment with the respective state are available for the given methods. + * + * +-+--++-+-+ + * | Method / SegmentState | COPY_SEGMENT_STARTED | COPY_SEGMENT_FINISHED | DELETE_SEGMENT_STARTED | DELETE_SEGMENT_STARTED | + * |-+--++-+-| + * | remoteLogSegmentMetadata|No| Yes | No | No| + * | (int leaderEpoch, long offset) | | | | | + * |-+--++-+-| + * | listRemoteLogSegments |Yes | Yes | Yes| No| + * | (int leaderEpoch) |
[jira] [Created] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
A. Sophie Blee-Goldman created KAFKA-12638: -- Summary: Remove default implementation of ConsumerRebalanceListener#onPartitionsLost Key: KAFKA-12638 URL: https://issues.apache.org/jira/browse/KAFKA-12638 Project: Kafka Issue Type: Improvement Components: consumer Reporter: A. Sophie Blee-Goldman When we added the #onPartitionsLost callback to the ConsumerRebalanceListener in KIP-429, we gave it a default implementation that just invoked the existing #onPartitionsRevoked method for backwards compatibility. This is somewhat inconvenient, since we generally want to invoke #onPartitionsLost in order to skip the committing of offsets on revoked partitions, which is exactly what #onPartitionsRevoked does. I don't think we can just remove it in 3.0 since we haven't indicated that we "deprecated" the default implementation or logged a warning that we intend to remove the default in a future release (as we did for the RocksDBConfigSetter#close method in Streams, for example). We should try to add such a warning now, so we can remove it in a future release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dengziming commented on pull request #10488: MINOR: Remove some unnecessary cyclomatic complexity suppressions
dengziming commented on pull request #10488: URL: https://github.com/apache/kafka/pull/10488#issuecomment-816368204 @chia7712 , Hello, PTAL, Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
[ https://issues.apache.org/jira/browse/KAFKA-12638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317601#comment-17317601 ] Ben Chen commented on KAFKA-12638: -- Just curious. For such issue tagged with Majority, who can work on it? Especially there's some time constraints. Also do we have some mechanism to earn credits so that someone with enough "credits" can work on certain things? > Remove default implementation of ConsumerRebalanceListener#onPartitionsLost > --- > > Key: KAFKA-12638 > URL: https://issues.apache.org/jira/browse/KAFKA-12638 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > > When we added the #onPartitionsLost callback to the ConsumerRebalanceListener > in KIP-429, we gave it a default implementation that just invoked the > existing #onPartitionsRevoked method for backwards compatibility. This is > somewhat inconvenient, since we generally want to invoke #onPartitionsLost in > order to skip the committing of offsets on revoked partitions, which is > exactly what #onPartitionsRevoked does. > I don't think we can just remove it in 3.0 since we haven't indicated that we > "deprecated" the default implementation or logged a warning that we intend to > remove the default in a future release (as we did for the > RocksDBConfigSetter#close method in Streams, for example). We should try to > add such a warning now, so we can remove it in a future release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante commented on a change in pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during task shutdown
C0urante commented on a change in pull request #10503: URL: https://github.com/apache/kafka/pull/10503#discussion_r610316462 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ## @@ -185,8 +185,12 @@ private void doRun() throws InterruptedException { execute(); } catch (Throwable t) { -log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", this, t); -throw t; +if (!stopping && !cancelled) { Review comment: I was thinking we could log different messages based on whether only `stopping` or both `stopping` and `cancelled` were true. If `cancelled` is true, we should make sure to let people know that there might be a newer instance of this task already running, and that the log message isn't indicative that that newer instance has failed. If only `stopping` is true, then the existing log message should suffice. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator
[ https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317613#comment-17317613 ] Sagar Rao commented on KAFKA-8295: -- Thanks Sophie for the background. I do plan to run some benchmarks by extending the rocksjava benchmarks for my use case. I can probably share the performance numbers here which might help us to decide if it should be considered or not. Yeah, this merge operator mayn't be applicable for most other state stores so if it comes to a KIP for this, I would look at 617- which I have seen before while implementing KIP-614. Thanks for sharing that! > Optimize count() using RocksDB merge operator > - > > Key: KAFKA-8295 > URL: https://issues.apache.org/jira/browse/KAFKA-8295 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > In addition to regular put/get/delete RocksDB provides a fourth operation, > merge. This essentially provides an optimized read/update/write path in a > single operation. One of the built-in (C++) merge operators exposed over the > Java API is a counter. We should be able to leverage this for a more > efficient implementation of count() > > (Note: Unfortunately it seems unlikely we can use this to optimize general > aggregations, even if RocksJava allowed for a custom merge operator, unless > we provide a way for the user to specify and connect a C++ implemented > aggregator – otherwise we incur too much cost crossing the jni for a net > performance benefit) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during task shutdown
kpatelatwork commented on a change in pull request #10503: URL: https://github.com/apache/kafka/pull/10503#discussion_r610322098 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ## @@ -185,8 +185,12 @@ private void doRun() throws InterruptedException { execute(); } catch (Throwable t) { -log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", this, t); -throw t; +if (!stopping && !cancelled) { Review comment: are these flags internal or can we just add "stopped={}, cancelled={} " to the message instead of adding more if/else to the code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during task shutdown
kpatelatwork commented on a change in pull request #10503: URL: https://github.com/apache/kafka/pull/10503#discussion_r610322098 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ## @@ -185,8 +185,12 @@ private void doRun() throws InterruptedException { execute(); } catch (Throwable t) { -log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", this, t); -throw t; +if (!stopping && !cancelled) { Review comment: @C0urante are these flags meant to be hidden or wdyt if we just add "stopped={}, cancelled={} " to the message instead of adding more if/else to the code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon opened a new pull request #10509: URL: https://github.com/apache/kafka/pull/10509 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12464) Enhance constrained sticky Assign algorithm
[ https://issues.apache.org/jira/browse/KAFKA-12464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317615#comment-17317615 ] Luke Chen commented on KAFKA-12464: --- [~ableegoldman], good news, after my enhancement, the _testLargeAssignmentAndGroupWithUniformSubscription_ test time down from 28xx ms, to 18xx ms. Improved 33% of performance. PR is submitted. Thank you. > Enhance constrained sticky Assign algorithm > --- > > Key: KAFKA-12464 > URL: https://issues.apache.org/jira/browse/KAFKA-12464 > Project: Kafka > Issue Type: Improvement > Components: consumer >Affects Versions: 2.7.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Labels: perfomance > > In KAFKA-9987, we did a great improvement for the case when all consumers > were subscribed to same set of topics. The algorithm contains 4 phases: > # Reassign as many previously owned partitions as possible, up to the > maxQuota > # Fill remaining members up to minQuota > # If we ran out of unassigned partitions before filling all consumers, we > need to start stealing partitions from the over-full consumers at max capacity > # Otherwise we may have run out of unfilled consumers before assigning all > partitions, in which case we should just distribute one partition each to all > consumers at min capacity > > Take an example for better understanding: > *example:* > Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, > ... t1p9 > Suppose, current assignment is: > _C0: t1p0, t1p1, t1p2, t1p3, t1p4_ > _C1: t1p5, t1p6, t1p7, t1p8, t1p9_ > Now, new consumer added: C2, so we'll do: > # Reassign as many previously owned partitions as possible, up to the > maxQuota > After this phase, the assignment will be: (maxQuota will be 4) > _C0: t1p0, t1p1, t1p2, t1p3_ > _C1: t1p5, t1p6, t1p7, t1p8_ > # Fill remaining members up to minQuota > After this phase, the assignment will be: > _C0: t1p0, t1p1, t1p2, t1p3_ > _C1: t1p5, t1p6, t1p7, t1p8_ > _C2: t1p4, t1p9_ > # If we ran out of unassigned partitions before filling all consumers, we > need to start stealing partitions from the over-full consumers at max capacity > After this phase, the assignment will be: > _C0: t1p0, t1p1, t1p2_ > _C1: t1p5, t1p6, t1p7, t1p8_ > _C2: t1p4, t1p9,_ _t1p3_ > # Otherwise we may have run out of unfilled consumers before assigning all > partitions, in which case we should just distribute one partition each to all > consumers at min capacity > > > As we can see, we need 3 phases to complete the assignment. But we can > actually completed with 2 phases. Here's the updated algorithm: > # Reassign as many previously owned partitions as possible, up to the > maxQuota, and also considering the numMaxQuota by the remainder of > (Partitions / Consumers) > # Fill remaining members up to maxQuota if current maxQuotaMember < > numMaxQuota, otherwise, to minQuota > > By considering the numMaxQuota, the original step 1 won't be too aggressive > to assign too many partitions to consumers, and the step 2 won't be too > conservative to assign not enough partitions to consumers, so that we don't > need step 3 and step 4 to balance them. > > {{So, the updated Pseudo-code sketch of the algorithm:}} > C_f := (P/N)_floor, the floor capacity > C_c := (P/N)_ceil, the ceiling capacity > *C_r := (P%N) the allowed number of members with C_c partitions assigned* > *num_max_capacity_members := current number of members with C_c partitions > assigned (default to 0)* > members := the sorted set of all consumers > partitions := the set of all partitions > unassigned_partitions := the set of partitions not yet assigned, initialized > to be all partitions > unfilled_members := the set of consumers not yet at capacity, initialized to > empty > -max_capacity_members := the set of members with exactly C_c partitions > assigned, initialized to empty- > member.owned_partitions := the set of previously owned partitions encoded in > the Subscription > // Reassign as many previously owned partitions as possible, *by considering > the num_max_capacity_members* > for member : members > remove any partitions that are no longer in the subscription from its > owned partitions > remove all owned_partitions if the generation is old > if member.owned_partitions.size < C_f > assign all owned partitions to member and remove from > unassigned_partitions > add member to unfilled_members > -else if member.owned_partitions.size == C_f- > -assign first C_f owned_partitions to member and remove from > unassigned_partitions- > else if member.owned_partitions.size >= C_c *&& > num_max_capacity_members < C_r* > *assign first C_c owned_partiti
[GitHub] [kafka] showuon commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon commented on pull request #10509: URL: https://github.com/apache/kafka/pull/10509#issuecomment-816379867 @ableegoldman , please help review this PR. The testLargeAssignmentAndGroupWithUniformSubscription test time down from 28xx ms, to 18xx ms. Improved 33% of performance. Yeah~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r610324204 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -163,117 +160,95 @@ private boolean allSubscriptionsEqual(Set allTopics, */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { +log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", +partitionsPerTopic, consumerToOwnedPartitions)); + SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int numExpectedMaxCapacityMembers = unassignedPartitions.size() % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMaxCapacityMembers = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; Review comment: we should make the capacity to maxQuota to avoid memory reallocation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r610325577 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -163,117 +160,95 @@ private boolean allSubscriptionsEqual(Set allTopics, */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { +log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", +partitionsPerTopic, consumerToOwnedPartitions)); + SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); Review comment: We don't need to keep the `maxCapacityMembers`/`minCapacityMembers` anymore because we can precisely know how many members can have max capacity now, by this ``` int numExpectedMaxCapacityMembers = unassignedPartitions.size() % numberOfConsumers; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r610325987 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -163,117 +160,95 @@ private boolean allSubscriptionsEqual(Set allTopics, */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { +log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", +partitionsPerTopic, consumerToOwnedPartitions)); + SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int numExpectedMaxCapacityMembers = unassignedPartitions.size() % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMaxCapacityMembers = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp : ownedPartitions) { -if (i < maxQuota) { -consumerAssignment.add(tp); -unassignedPartitions.remove(tp); -} else { -allRevokedPartitions.add(tp); -} -++i; -} if (ownedPartitions.size() < minQuota) { +// the expected assignment size is more than consumer have now, so keep all the owned partitions +// and put this member into unfilled member list +consumerAssignment.addAll(ownedPartitions); +unassignedPartitions.removeAll(ownedPartitions); unfilledMembers.add(consumer); +} else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { +// consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members +// so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions +consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota)); +unassignedPartitions.removeAll(ownedPartitions.subList(0, maxQuota)); +allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size())); } else { -// It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota -if (consumerAssignment.size() == minQuota) -minCapacityMembers.add(consumer); -if (consumerAssignment.size() == maxQuota) -maxCapacityMembers.add(consumer); +// consumer owned the "minQuota" of partitions or more +// so keep "minQuota" of the owned partitions, and revoke the rest of the partitions +consumerAssignment.addAll(ownedPartitions.subList(0, minQuota)); +unassignedPartitions.removeAll(ownedPartitions.subList(0, minQuota)); +allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size()));
[GitHub] [kafka] ableegoldman commented on pull request #10486: KAFKA-12492: Fix the formatting of example RocksDBConfigSetter
ableegoldman commented on pull request #10486: URL: https://github.com/apache/kafka/pull/10486#issuecomment-816382826 Just some unrelated test failures in `kafka.server.RaftClusterTest` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r610326449 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java ## @@ -215,8 +215,8 @@ public void testTwoConsumersTwoTopicsSixPartitions() { subscriptions.put(consumer2, new Subscription(topics(topic1, topic2))); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); -assertEquals(partitions(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1)), assignment.get(consumer1)); -assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), assignment.get(consumer2)); +assertEquals(partitions(tp(topic1, 0), tp(topic1, 1), tp(topic1, 2)), assignment.get(consumer1)); +assertEquals(partitions(tp(topic2, 0), tp(topic2, 1), tp(topic2, 2)), assignment.get(consumer2)); Review comment: I don't do round-robin assign partitions in step 2 now, but I think this should not a big deal, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10486: KAFKA-12492: Fix the formatting of example RocksDBConfigSetter
ableegoldman merged pull request #10486: URL: https://github.com/apache/kafka/pull/10486 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12492: --- Fix Version/s: 3.0.0 > Formatting of example RocksDBConfigSetter is messed up > -- > > Key: KAFKA-12492 > URL: https://issues.apache.org/jira/browse/KAFKA-12492 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Trivial > Labels: docs, newbie > Fix For: 3.0.0 > > > See the example implementation class CustomRocksDBConfig in the docs for the > rocksdb.config.setter > https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12634) Should checkpoint after restore finished
[ https://issues.apache.org/jira/browse/KAFKA-12634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317632#comment-17317632 ] A. Sophie Blee-Goldman commented on KAFKA-12634: I don't think bulk loading should affect the checkpoint, the data up to an offset is either in the state store or it isn't. I'd be fine with just doing a small KIP to fix in 3.0+ though > Should checkpoint after restore finished > > > Key: KAFKA-12634 > URL: https://issues.apache.org/jira/browse/KAFKA-12634 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Matthias J. Sax >Priority: Major > > For state stores, Kafka Streams maintains local checkpoint files to track the > offsets of the state store changelog topics. The checkpoint is updated on > commit or when a task is closed cleanly. > However, after a successful restore, the checkpoint is not written. Thus, if > an instance crashes after restore but before committing, even if the state is > on local disk the checkpoint file is missing (indicating that there is no > state) and thus state would be restored from scratch. > While for most cases, the time between restore end and next commit is small, > there are cases when this time could be large, for example if there is no new > input data to be processed (if there is no input data, the commit would be > skipped). > Thus, we should write the checkpoint file after a successful restore to close > this gap (or course, only for at-least-once processing). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante commented on a change in pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during task shutdown
C0urante commented on a change in pull request #10503: URL: https://github.com/apache/kafka/pull/10503#discussion_r610330282 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ## @@ -185,8 +185,12 @@ private void doRun() throws InterruptedException { execute(); } catch (Throwable t) { -log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", this, t); -throw t; +if (!stopping && !cancelled) { Review comment: Hmm... I don't believe "cancelled" is a term we've used in public-facing surfaces in the past. For example, when a task takes too long to shut down now and we have to cancel it, we log the message that "Graceful stop... failed": https://github.com/apache/kafka/blob/5964401bf9aab611bd4a072941bd1c927e044258/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L866 Personally I think the additional code complexity is worth it; the original ticket mentions a case where these messages confuse users because they're generated for cancelled tasks, so I'd rather err on the side of making things as obvious as possible to them. It might be possible to keep things simple and eliminate branches by tweaking the message to make it clear that newer task instances won't be impacted by this failure, though. A possible downside to this is that it might be confusing if there are no newer instances that will be brought up on the worker (because the connector has been deleted, the number of tasks has been reduced, or the task has been reassigned to another worker). But with some careful wording we might be able to avoid misleading people into thinking that this message implies there's already another instance running. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317629#comment-17317629 ] A. Sophie Blee-Goldman commented on KAFKA-12492: Merged, thanks for the PR! Unfortunately we just barely missed the 2.8 release, as John cut the RC earlier today. If you want to see this fix in the 2.8 docs then you'll need to submit this exact PR against the kafka-site repo as we discussed, but the 2.8 RC is still under vote at the moment so you'd need to wait for that to be released at which point the docs in kafka/2.8 will be copied over to a new 28 subdirectory in kafka-site. > Formatting of example RocksDBConfigSetter is messed up > -- > > Key: KAFKA-12492 > URL: https://issues.apache.org/jira/browse/KAFKA-12492 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Trivial > Labels: docs, newbie > Fix For: 3.0.0 > > > See the example implementation class CustomRocksDBConfig in the docs for the > rocksdb.config.setter > https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
[ https://issues.apache.org/jira/browse/KAFKA-12638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317635#comment-17317635 ] A. Sophie Blee-Goldman commented on KAFKA-12638: Nope, no such things as credits in Kafka -- technically anyone can pick up anything, and the major deciding factor is just your own confidence and familiarity with Kafka. If you're relatively new to Kafka and pick up something large that you need a lot of help with, you might struggle to get it done just because everyone who works on this is always busy (not because they don't want to help -- they do). We try to label things with newbie and/or newbie++ to indicate good entry-level tickets. That said, it never hurts to ask before picking something up -- often if it's a blocker ticket, or maybe critical, then it's likely that the person who reported it or someone they know already plan to work on it. But you can always ask -- and "Major" is the default priority which many tickets are just left at, so don't let that stop you. > Remove default implementation of ConsumerRebalanceListener#onPartitionsLost > --- > > Key: KAFKA-12638 > URL: https://issues.apache.org/jira/browse/KAFKA-12638 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > > When we added the #onPartitionsLost callback to the ConsumerRebalanceListener > in KIP-429, we gave it a default implementation that just invoked the > existing #onPartitionsRevoked method for backwards compatibility. This is > somewhat inconvenient, since we generally want to invoke #onPartitionsLost in > order to skip the committing of offsets on revoked partitions, which is > exactly what #onPartitionsRevoked does. > I don't think we can just remove it in 3.0 since we haven't indicated that we > "deprecated" the default implementation or logged a warning that we intend to > remove the default in a future release (as we did for the > RocksDBConfigSetter#close method in Streams, for example). We should try to > add such a warning now, so we can remove it in a future release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
[ https://issues.apache.org/jira/browse/KAFKA-12638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317638#comment-17317638 ] A. Sophie Blee-Goldman commented on KAFKA-12638: If you're interested in this ticket, we can't do the whole thing because of the compatibility concerns I mentioned but feel free to pick up the first part, and just log a warning if the user has not implemented the #onPartitionsLost callback. Something like this: https://github.com/apache/kafka/blob/2.8/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java#L61 > Remove default implementation of ConsumerRebalanceListener#onPartitionsLost > --- > > Key: KAFKA-12638 > URL: https://issues.apache.org/jira/browse/KAFKA-12638 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > > When we added the #onPartitionsLost callback to the ConsumerRebalanceListener > in KIP-429, we gave it a default implementation that just invoked the > existing #onPartitionsRevoked method for backwards compatibility. This is > somewhat inconvenient, since we generally want to invoke #onPartitionsLost in > order to skip the committing of offsets on revoked partitions, which is > exactly what #onPartitionsRevoked does. > I don't think we can just remove it in 3.0 since we haven't indicated that we > "deprecated" the default implementation or logged a warning that we intend to > remove the default in a future release (as we did for the > RocksDBConfigSetter#close method in Streams, for example). We should try to > add such a warning now, so we can remove it in a future release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante commented on pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor
C0urante commented on pull request #10315: URL: https://github.com/apache/kafka/pull/10315#issuecomment-816388437 Closing as this will likely be accomplished by KIP-726; can reopen if necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante closed pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor
C0urante closed pull request #10315: URL: https://github.com/apache/kafka/pull/10315 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12453) Guidance on whether a topology is eligible for optimisation
[ https://issues.apache.org/jira/browse/KAFKA-12453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317651#comment-17317651 ] Patrick O'Keeffe commented on KAFKA-12453: -- Thanks [~mjsax], I'm happy to raise a PR to update the docs. Just a couple of questions: # Why does the input topic need to be configured with log compaction? # I was going to update the section on optimisation in "config-streams.html" and the javadoc for StreamsBuilder.table - do any other docs spring to mind? > Guidance on whether a topology is eligible for optimisation > --- > > Key: KAFKA-12453 > URL: https://issues.apache.org/jira/browse/KAFKA-12453 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Patrick O'Keeffe >Priority: Major > > Since the introduction of KStream.toTable() in Kafka 2.6.x, the decision > about whether a topology is eligible for optimisation is no longer a simple > one, and is related to whether toTable() operations are preceded by key > changing operators. > This decision requires expert level knowledge, and there are serious > implications associated with getting it wrong in terms of fault tolerance > Some ideas spring to mind around how to guide developers to make the correct > decision: > # Topology.describe() could indicate whether this topology is eligible for > optimisation > # Topologies could be automatically optimised - note this may have an impact > at deployment time, in that an application reset may be required. The > developer would need to made aware of this and adjust the deployment plan > accordingly > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
[ https://issues.apache.org/jira/browse/KAFKA-12638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Chen reassigned KAFKA-12638: Assignee: Ben Chen > Remove default implementation of ConsumerRebalanceListener#onPartitionsLost > --- > > Key: KAFKA-12638 > URL: https://issues.apache.org/jira/browse/KAFKA-12638 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Major > > When we added the #onPartitionsLost callback to the ConsumerRebalanceListener > in KIP-429, we gave it a default implementation that just invoked the > existing #onPartitionsRevoked method for backwards compatibility. This is > somewhat inconvenient, since we generally want to invoke #onPartitionsLost in > order to skip the committing of offsets on revoked partitions, which is > exactly what #onPartitionsRevoked does. > I don't think we can just remove it in 3.0 since we haven't indicated that we > "deprecated" the default implementation or logged a warning that we intend to > remove the default in a future release (as we did for the > RocksDBConfigSetter#close method in Streams, for example). We should try to > add such a warning now, so we can remove it in a future release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
[ https://issues.apache.org/jira/browse/KAFKA-12638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317654#comment-17317654 ] Ben Chen commented on KAFKA-12638: -- Really appreciate it! > Remove default implementation of ConsumerRebalanceListener#onPartitionsLost > --- > > Key: KAFKA-12638 > URL: https://issues.apache.org/jira/browse/KAFKA-12638 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Major > > When we added the #onPartitionsLost callback to the ConsumerRebalanceListener > in KIP-429, we gave it a default implementation that just invoked the > existing #onPartitionsRevoked method for backwards compatibility. This is > somewhat inconvenient, since we generally want to invoke #onPartitionsLost in > order to skip the committing of offsets on revoked partitions, which is > exactly what #onPartitionsRevoked does. > I don't think we can just remove it in 3.0 since we haven't indicated that we > "deprecated" the default implementation or logged a warning that we intend to > remove the default in a future release (as we did for the > RocksDBConfigSetter#close method in Streams, for example). We should try to > add such a warning now, so we can remove it in a future release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317658#comment-17317658 ] Ben Chen commented on KAFKA-12492: -- Sounds good to me. I will do a follow-up PR. > Formatting of example RocksDBConfigSetter is messed up > -- > > Key: KAFKA-12492 > URL: https://issues.apache.org/jira/browse/KAFKA-12492 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Trivial > Labels: docs, newbie > Fix For: 3.0.0 > > > See the example implementation class CustomRocksDBConfig in the docs for the > rocksdb.config.setter > https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
[ https://issues.apache.org/jira/browse/KAFKA-12638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317670#comment-17317670 ] dengziming commented on KAFKA-12638: [~ben.c] Feel free to take as many issues as you can as long as you have enough time. > Remove default implementation of ConsumerRebalanceListener#onPartitionsLost > --- > > Key: KAFKA-12638 > URL: https://issues.apache.org/jira/browse/KAFKA-12638 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Major > > When we added the #onPartitionsLost callback to the ConsumerRebalanceListener > in KIP-429, we gave it a default implementation that just invoked the > existing #onPartitionsRevoked method for backwards compatibility. This is > somewhat inconvenient, since we generally want to invoke #onPartitionsLost in > order to skip the committing of offsets on revoked partitions, which is > exactly what #onPartitionsRevoked does. > I don't think we can just remove it in 3.0 since we haven't indicated that we > "deprecated" the default implementation or logged a warning that we intend to > remove the default in a future release (as we did for the > RocksDBConfigSetter#close method in Streams, for example). We should try to > add such a warning now, so we can remove it in a future release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dengziming opened a new pull request #10510: KAFKA-12607: Test case for resigned state vote granting
dengziming opened a new pull request #10510: URL: https://github.com/apache/kafka/pull/10510 *More detailed description of your change* As discussed in the Jira, `ResignedState` will transition to `VotedState` and grant vote, just add some unit tests to verify this transition. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org