[ https://issues.apache.org/jira/browse/KAFKA-14625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17677875#comment-17677875 ]
Sagar Rao commented on KAFKA-14625: ----------------------------------- hey [~satish.duggana] , i am assuming you plan to work on this? Nonetheless, I took a look at the 2 APIs and the places from where it's been called. The main difference that I see is that `CommittedOffsetsFile` functions with a map while the other 2 scala classes `OffsetCheckpointFile` and `LeaderEpochCheckpointFile` interoperate with java lists and scala sequences. So, the main difference is in `CommittedOffsetsFile` which operates with entrySet of the `partitionToConsumedOffsets`. I am thinking if we can maintain another list of TopicPartitionOffsets which gets stores the topic/partitions and the map `partitionToConsumedOffsets` stores the TopicPartitionOffsets object keyed by partition (same key as today). We can keep updating the list as and when the entries are added/removed in the map and when we want to sync, we can pass the List as is. A very crude idea in this java program: {code:java} public class TestEquality { static class TopicPartitionOffsets { Integer tp; Long offset; public TopicPartitionOffsets(Integer tp, Long offset) { this.tp = tp; this.offset = offset; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TopicPartitionOffsets that = (TopicPartitionOffsets) o; return Objects.equals(tp, that.tp) && Objects.equals(offset, that.offset); } @Override public int hashCode() { return Objects.hash(tp, offset); } @Override public String toString() { return "TopicPartitionOffsets{" + "tp=" + tp + ", offset=" + offset + '}'; } } public static void main(String[] args) { Map<Integer, TopicPartitionOffsets> partitionToConsumedOffsets = new ConcurrentHashMap<>(); LinkedList<TopicPartitionOffsets> topicPartitionOffsets = new LinkedList<>(); TopicPartitionOffsets tp1 = new TopicPartitionOffsets(1, 100L); partitionToConsumedOffsets.put(1, tp1); topicPartitionOffsets.add(tp1); System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets); tp1.offset = 200L; System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets); TopicPartitionOffsets tp2 = partitionToConsumedOffsets.get(1); tp2.offset = 300L; System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets); topicPartitionOffsets.remove(tp2); partitionToConsumedOffsets.remove(1); System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets); } } {code} I am using a LinkedList here so that removing from the List becomes easier(adds extra time complexity though). And also, the 2 updates on the data structures should happen in an atomic fashion i.e if one fails the other one doesn't fail. > CheckpointFile read and write API consistency > ---------------------------------------------- > > Key: KAFKA-14625 > URL: https://issues.apache.org/jira/browse/KAFKA-14625 > Project: Kafka > Issue Type: Improvement > Components: core > Reporter: Satish Duggana > Priority: Major > > ` CheckpointFile` has the below read and write APIs, write expects a > Collection of items, but read returns a List of elements. It is better to > look into these APIs and its usages and see whether consistency can be > brought without introducing any extra collection conversions. -- This message was sent by Atlassian Jira (v8.20.10#820010)