chia7712 commented on code in PR #20083: URL: https://github.com/apache/kafka/pull/20083#discussion_r2185863531
########## storage/src/main/java/org/apache/kafka/storage/internals/log/CommittedPartitionState.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.metadata.LeaderRecoveryState; + +import java.util.Set; + +public record CommittedPartitionState(Set<Integer> isr, LeaderRecoveryState leaderRecoveryState) implements PartitionState { + + @Override + public Set<Integer> maximalIsr() { + return isr; + } + + @Override + public Boolean isInflight() { + return false; + } + + @Override + public String toString() { Review Comment: the information of this custom `toString` is almost equal to auto-generated `toString`, right? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/PendingPartitionChange.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.metadata.LeaderAndIsr; +import org.apache.kafka.metadata.LeaderRecoveryState; + +public interface PendingPartitionChange extends PartitionState { + CommittedPartitionState lastCommittedState(); + LeaderAndIsr sentLeaderAndIsr(); + + default LeaderRecoveryState leaderRecoveryState() { Review Comment: this is not final method, so perhaps we should leave it to sub class to implement it. ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -1949,9 +1799,9 @@ class Partition(val topicPartition: TopicPartition, // 2) leaderAndIsr.partitionEpoch == partitionEpoch: No update was performed since proposed and actual state are the same. // In both cases, we want to move from Pending to Committed state to ensure new updates are processed. - partitionState = CommittedPartitionState(leaderAndIsr.isr.asScala.map(_.toInt).toSet, leaderAndIsr.leaderRecoveryState) + partitionState = new CommittedPartitionState(util.Set.copyOf(leaderAndIsr.isr), leaderAndIsr.leaderRecoveryState) Review Comment: Could you change `leaderAndIsr.isr` to return immutable set? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/PendingExpandIsr.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.metadata.LeaderAndIsr; + +import java.util.HashSet; +import java.util.Set; + +public record PendingExpandIsr(int newInSyncReplicaId, + LeaderAndIsr sentLeaderAndIsr, + CommittedPartitionState lastCommittedState) implements PendingPartitionChange { + + @Override + public void notifyListener(AlterPartitionListener alterPartitionListener) { + alterPartitionListener.markIsrExpand(); + } + + @Override + public Set<Integer> isr() { + return lastCommittedState.isr(); + } + + @Override + public Set<Integer> maximalIsr() { + Set<Integer> newIsr = new HashSet<>(lastCommittedState.isr()); + newIsr.add(newInSyncReplicaId); + return Set.copyOf(newIsr); Review Comment: Could you please use `Collections.unmodifiableSet` instead to avoid extra deep copy? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionState.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.metadata.LeaderRecoveryState; + +import java.util.Set; + +public interface PartitionState { + /** + * Includes only the in-sync replicas which have been committed to ZK. + */ + Set<Integer> isr(); + + /** + * This set may include un-committed ISR members following an expansion. This "effective" ISR is used for advancing + * the high watermark as well as determining which replicas are required for acks=all produce requests. + * + * Only applicable as of IBP 2.7-IV2, for older versions this will return the committed ISR + */ + Set<Integer> maximalIsr(); + + /** + * The leader recovery state. See the description for LeaderRecoveryState for details on the different values. + */ + LeaderRecoveryState leaderRecoveryState(); + + /** + * Indicates if we have an AlterPartition request inflight. + */ + Boolean isInflight(); Review Comment: `boolean` ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -1779,7 +1629,7 @@ class Partition(val topicPartition: TopicPartition, // When shrinking the ISR, we cannot assume that the update will succeed as this could // erroneously advance the HW if the `AlterPartition` were to fail. Hence the "maximal ISR" // for `PendingShrinkIsr` is the current ISR. - val isrToSend = partitionState.isr -- outOfSyncReplicaIds + val isrToSend = partitionState.isr.asScala.map(_.toInt).toSet -- outOfSyncReplicaIds Review Comment: This will create many temporary collections. How about `partitionState.isr.asScala.map(_.toInt).diff(outOfSyncReplicaIds)` ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -1098,7 +948,7 @@ class Partition(val topicPartition: TopicPartition, case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset" } - val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).flatMap(getReplica) + val curInSyncReplicaObjects = (curMaximalIsr.asScala.map(_.toInt) - localBrokerId).flatMap(getReplica) Review Comment: ditto ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -1298,7 +1148,7 @@ class Partition(val topicPartition: TopicPartition, def getOutOfSyncReplicas(maxLagMs: Long): Set[Int] = { val current = partitionState if (!current.isInflight) { - val candidateReplicaIds = current.isr - localBrokerId + val candidateReplicaIds = current.isr.asScala.map(_.toInt).toSet - localBrokerId Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org