abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286089506


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();
 
+    // TODO - Update comments below
     // It indicates whether the closing process has been started or not. If it 
is set as true,
     // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
+    private volatile boolean isClosed = false;
     // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
     // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    private volatile boolean isAssignmentChanged = true;
 
     // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
-    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+    private volatile Set<Integer> assignedMetadataPartitions = 
Collections.emptySet();
 
     // User topic partitions that this broker is a leader/follower for.
-    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+    private volatile Map<TopicIdPartition, UserTopicIdPartition> 
assignedUserTopicIdPartitions = Collections.emptyMap();
+    private volatile Set<TopicIdPartition> 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-    // may or may not have been processed based on the assigned topic 
partitions.
-    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+    private long uninitializedAt = time.milliseconds();
+    private boolean isAllUserTopicPartitionsInitialized;
 
-    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-    private final long committedOffsetSyncIntervalMs;
-    private CommittedOffsetsFile committedOffsetsFile;
-    private long lastSyncedTimeMs;
+    private Map<TopicPartition, BeginAndEndOffsetHolder> 
offsetHolderByMetadataPartition = new HashMap<>();
+    private boolean isOffsetsFetchFailed = false;
+    private long lastFailedFetchOffsetsTimestamp;
 
-    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
-                        String metadataTopicName,
-                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+    public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
-                        Path committedOffsetsPath,
-                        Time time,
-                        long committedOffsetSyncIntervalMs) {
-        this.consumer = Objects.requireNonNull(consumer);
-        this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+                        Function<Optional<String>, Consumer<byte[], byte[]>> 
consumerSupplier) {
+        this.consumer = consumerSupplier.apply(Optional.empty());
         this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
         this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-        this.time = Objects.requireNonNull(time);
-        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-        initializeConsumerAssignment(committedOffsetsPath);
-    }
-
-    private void initializeConsumerAssignment(Path committedOffsetsPath) {
-        try {
-            committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-
-        Map<Integer, Long> committedOffsets = Collections.emptyMap();
-        try {
-            // Load committed offset and assign them in the consumer.
-            committedOffsets = committedOffsetsFile.readEntries();
-        } catch (IOException e) {
-            // Ignore the error and consumer consumes from the earliest offset.
-            log.error("Encountered error while building committed offsets from 
the file. " +
-                              "Consumer will consume from the earliest offset 
for the assigned partitions.", e);
-        }
-
-        if (!committedOffsets.isEmpty()) {
-            // Assign topic partitions from the earlier committed offsets file.
-            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
-            assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
-            Set<TopicPartition> metadataTopicPartitions = 
earlierAssignedPartitions.stream()
-                                                                               
    .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
-                                                                               
    .collect(Collectors.toSet());
-            consumer.assign(metadataTopicPartitions);
-
-            // Seek to the committed offsets
-            for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) 
{
-                log.debug("Updating consumed offset: [{}] for partition [{}]", 
entry.getValue(), entry.getKey());
-                partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
-                consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
-            }
-
-            lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
-        }
     }
 
     @Override
     public void run() {
-        log.info("Started Consumer task thread.");
-        lastSyncedTimeMs = time.milliseconds();
-        try {
-            while (!closing) {
-                maybeWaitForPartitionsAssignment();
+        log.info("Starting consumer task thread.");
+        while (!isClosed) {
+            try {
+                if (isAssignmentChanged) {
+                    maybeWaitForPartitionsAssignment();
+                }
 
                 log.trace("Polling consumer to receive remote log metadata 
topic records");
-                ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
+                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollIntervalMs));
                 for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                     processConsumerRecord(record);
                 }
-
-                maybeSyncCommittedDataAndOffsets(false);
+                maybeMarkUserPartitionsAsReady();
+            } catch (final WakeupException ex) {
+                // ignore logging the error
+                isClosed = true;
+            } catch (final RetriableException ex) {
+                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
+            }  catch (final Exception ex) {
+                isClosed = true;
+                log.error("Error occurred while processing the records", ex);
             }
-        } catch (Exception e) {
-            log.error("Error occurred in consumer task, close:[{}]", closing, 
e);
-        } finally {
-            maybeSyncCommittedDataAndOffsets(true);
-            closeConsumer();
-            log.info("Exiting from consumer task thread");
         }
+        try {
+            consumer.close(Duration.ofSeconds(30));
+        } catch (final Exception e) {
+            log.error("Error encountered while closing the consumer", e);
+        }
+        log.info("Exited from consumer task thread");
     }
 
     private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
-        // Taking assignPartitionsLock here as updateAssignmentsForPartitions 
changes assignedTopicPartitions
-        // and also calls 
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for 
the removed
-        // partitions.
-        RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
-        synchronized (assignPartitionsLock) {
-            if 
(assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
-                
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
-            } else {
-                log.debug("This event {} is skipped as the topic partition is 
not assigned for this instance.", remoteLogMetadata);
-            }
-            log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
-            partitionToConsumedOffsets.put(record.partition(), 
record.offset());
+        final RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
+        if (canProcess(remoteLogMetadata, record.offset())) {
+            
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+            
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), 
record.offset());
+        } else {
+            log.debug("The event {} is skipped because it is either already 
processed or not assigned to this consumer", remoteLogMetadata);
         }
+        log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
+        readOffsetsByMetadataPartition.put(record.partition(), 
record.offset());
+    }
+
+    private boolean canProcess(final RemoteLogMetadata metadata, final long 
recordOffset) {
+        final TopicIdPartition tpId = metadata.topicIdPartition();
+        final Long readOffset = readOffsetsByUserTopicPartition.get(tpId);
+        return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && 
(readOffset == null || readOffset < recordOffset);
     }
 
-    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
-        // Return immediately if there is no consumption from last time.
-        boolean noConsumedOffsetUpdates = 
partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets);
-        if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
-            log.debug("Skip syncing committed offsets, 
noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, 
forceSync);
+    private void maybeMarkUserPartitionsAsReady() {
+        if (isAllUserTopicPartitionsInitialized) {
             return;
         }
-
-        try {
-            // Need to take lock on assignPartitionsLock as 
assignedTopicPartitions might
-            // get updated by other threads.
-            synchronized (assignPartitionsLock) {
-                for (TopicIdPartition topicIdPartition : 
assignedTopicPartitions) {
-                    int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
-                    Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
-                    if (offset != null) {
-                        
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, 
metadataPartition, offset);
-                    } else {
-                        log.debug("Skipping sync-up of the 
remote-log-metadata-file for partition: [{}] , with remote log metadata 
partition{}, and no offset",
-                                topicIdPartition, metadataPartition);
+        maybeFetchBeginAndEndOffsets();
+        boolean isAllInitialized = true;
+        for (final UserTopicIdPartition utp : 
assignedUserTopicIdPartitions.values()) {
+            if (utp.isAssigned && !utp.isInitialized) {
+                final Integer metadataPartition = utp.metadataPartition;
+                final BeginAndEndOffsetHolder holder = 
offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition));
+                // The offset-holder can be null, when the recent assignment 
wasn't picked up by the consumer.
+                if (holder != null) {
+                    final Long readOffset = 
readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L);
+                    // 1) The end-offset was fetched only once during 
reassignment. The metadata-partition can receive
+                    // new stream of records, so the consumer can read records 
more than the last-fetched end-offset.
+                    // 2) When the internal topic becomes empty due to breach 
by size/time/start-offset, then there
+                    // are no records to read.
+                    if (readOffset + 1 >= holder.endOffset || 
holder.endOffset.equals(holder.beginOffset)) {
+                        markInitialized(utp);
                     }
                 }
-
-                // Write partitionToConsumedOffsets into committed offsets 
file as we do not want to process them again
-                // in case of restarts.
-                committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
-                lastSyncedPartitionToConsumedOffsets = new 
HashMap<>(partitionToConsumedOffsets);
             }
-
-            lastSyncedTimeMs = time.milliseconds();
-        } catch (IOException e) {
-            throw new KafkaException("Error encountered while writing 
committed offsets to a local file", e);
+            isAllInitialized = isAllInitialized && utp.isInitialized;
         }
-    }
-
-    private void closeConsumer() {
-        log.info("Closing the consumer instance");
-        try {
-            consumer.close(Duration.ofSeconds(30));
-        } catch (Exception e) {
-            log.error("Error encountered while closing the consumer", e);
+        if (isAllInitialized) {
+            log.info("Initialized for all the {} assigned user-partitions 
mapped to the {} meta-partitions in {} ms",
+                assignedUserTopicIdPartitions.size(), 
assignedMetadataPartitions.size(),
+                time.milliseconds() - uninitializedAt);
         }
+        isAllUserTopicPartitionsInitialized = isAllInitialized;
     }
 
-    private void maybeWaitForPartitionsAssignment() {
-        Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+    void maybeWaitForPartitionsAssignment() throws InterruptedException {
+        // Snapshots of the metadata-partition and user-topic-partition are 
used to reduce the scope of the
+        // synchronization block.
+        // 1) LEADER_AND_ISR and STOP_REPLICA requests adds / removes the 
user-topic-partitions from the request
+        //    handler threads. Those threads should not be blocked for a long 
time, therefore scope of the
+        //    synchronization block is reduced to bare minimum.
+        // 2) Note that the consumer#position, consumer#seekToBeginning, 
consumer#seekToEnd and the other consumer APIs
+        //    response times are un-predictable. Those should not be kept in 
the synchronization block.
+        final Set<Integer> metadataPartitionSnapshot = new HashSet<>();
+        final Set<UserTopicIdPartition> assignedUserTopicIdPartitionsSnapshot 
= new HashSet<>();
         synchronized (assignPartitionsLock) {
-            // If it is closing, return immediately. This should be inside the 
assignPartitionsLock as the closing is updated
-            // in close() method with in the same lock to avoid any race 
conditions.
-            if (closing) {
-                return;
+            while (!isClosed && assignedUserTopicIdPartitions.isEmpty()) {
+                log.debug("Waiting for remote log metadata partitions to be 
assigned");
+                assignPartitionsLock.wait();
             }
-
-            while (assignedMetaPartitions.isEmpty()) {
-                // If no partitions are assigned, wait until they are assigned.
-                log.debug("Waiting for assigned remote log metadata 
partitions..");
-                try {
-                    // No timeout is set here, as it is always notified. Even 
when it is closed, the race can happen
-                    // between the thread calling this method and the thread 
calling close(). We should have a check
-                    // for closing as that might have been set and notified 
with assignPartitionsLock by `close`
-                    // method.
-                    assignPartitionsLock.wait();
-
-                    if (closing) {
-                        return;
-                    }
-                } catch (InterruptedException e) {
-                    throw new KafkaException(e);
-                }
-            }
-
-            if (assignPartitions) {
-                assignedMetaPartitionsSnapshot = new 
HashSet<>(assignedMetaPartitions);
-                // Removing unassigned meta partitions from 
partitionToConsumedOffsets and partitionToCommittedOffsets
-                partitionToConsumedOffsets.entrySet().removeIf(entry -> 
!assignedMetaPartitions.contains(entry.getKey()));
-
-                assignPartitions = false;
+            if (!isClosed && isAssignmentChanged) {
+                assignedUserTopicIdPartitions.values().forEach(utp -> {
+                    metadataPartitionSnapshot.add(utp.metadataPartition);
+                    assignedUserTopicIdPartitionsSnapshot.add(utp);
+                });
+                isAssignmentChanged = false;
             }
         }
-
-        if (!assignedMetaPartitionsSnapshot.isEmpty()) {
-            executeReassignment(assignedMetaPartitionsSnapshot);
+        if (!metadataPartitionSnapshot.isEmpty()) {
+            final Set<TopicPartition> remoteLogPartitions = 
toRemoteLogPartitions(metadataPartitionSnapshot);
+            consumer.assign(remoteLogPartitions);
+            this.assignedMetadataPartitions = 
Collections.unmodifiableSet(metadataPartitionSnapshot);
+            // for newly assigned user-partitions, read from the beginning of 
the corresponding metadata partition
+            final Set<TopicPartition> seekToBeginOffsetPartitions = 
assignedUserTopicIdPartitionsSnapshot
+                .stream()
+                .filter(utp -> !utp.isAssigned)
+                .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+                .collect(Collectors.toSet());
+            consumer.seekToBeginning(seekToBeginOffsetPartitions);
+            // for other metadata partitions, read from the offset where the 
processing left last time.
+            remoteLogPartitions.stream()
+                .filter(tp -> !seekToBeginOffsetPartitions.contains(tp) &&
+                    readOffsetsByMetadataPartition.containsKey(tp.partition()))
+                .forEach(tp -> consumer.seek(tp, 
readOffsetsByMetadataPartition.get(tp.partition())));
+            // mark all the user-topic-partitions as assigned to the consumer.
+            assignedUserTopicIdPartitionsSnapshot.forEach(utp -> {
+                if (!utp.isAssigned) {
+                    // Note that there can be a race between `remove` and 
`add` partition assignment. Calling the
+                    // `maybeLoadPartition` here again to be sure that the 
partition gets loaded on the handler.
+                    
remotePartitionMetadataEventHandler.maybeLoadPartition(utp.topicIdPartition);
+                    utp.isAssigned = true;
+                }
+            });
+            processedAssignmentOfUserTopicIdPartitions = 
assignedUserTopicIdPartitionsSnapshot.stream()
+                .map(utp -> utp.topicIdPartition).collect(Collectors.toSet());
+            
clearResourcesForUnassignedUserTopicPartitions(assignedUserTopicIdPartitionsSnapshot);
+            isAllUserTopicPartitionsInitialized = false;
+            uninitializedAt = time.milliseconds();
+            fetchBeginAndEndOffsets();
         }
     }
 
-    private void executeReassignment(Set<Integer> 
assignedMetaPartitionsSnapshot) {
-        Set<TopicPartition> assignedMetaTopicPartitions =
-                assignedMetaPartitionsSnapshot.stream()
-                                              .map(partitionNum -> new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
-                                              .collect(Collectors.toSet());
-        log.info("Reassigning partitions to consumer task [{}]", 
assignedMetaTopicPartitions);
-        consumer.assign(assignedMetaTopicPartitions);
+    private void 
clearResourcesForUnassignedUserTopicPartitions(Set<UserTopicIdPartition> 
assignedUTPs) {
+        Set<TopicIdPartition> assignedPartitions = assignedUTPs.stream()
+            .map(utp -> utp.topicIdPartition).collect(Collectors.toSet());
+        // Note that there can be previously assigned user-topic-partitions 
where no records are there to read
+        // (eg) none of the segments for a partition were uploaded. Those 
partition resources won't be cleared.
+        // It can be fixed later when required since they are empty resources.
+        Set<TopicIdPartition> unassignedPartitions = 
readOffsetsByUserTopicPartition.keySet()
+            .stream()
+            .filter(e -> !assignedPartitions.contains(e))
+            .collect(Collectors.toSet());
+        unassignedPartitions.forEach(unassignedPartition -> {
+            
remotePartitionMetadataEventHandler.clearTopicPartition(unassignedPartition);
+            readOffsetsByUserTopicPartition.remove(unassignedPartition);
+        });
+        log.info("Unassigned user-topic-partitions: {}", 
unassignedPartitions.size());
+    }
+
+    public void addAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+        updateAssignments(Objects.requireNonNull(partitions), 
Collections.emptySet());
+    }
+
+    public void removeAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+        updateAssignments(Collections.emptySet(), 
Objects.requireNonNull(partitions));
     }
 
-    public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
-        updateAssignmentsForPartitions(partitions, Collections.emptySet());
+    private void updateAssignments(final Set<TopicIdPartition> addedPartitions,
+                                   final Set<TopicIdPartition> 
removedPartitions) {
+        log.info("Updating assignments for partitions added: {} and removed: 
{}", addedPartitions, removedPartitions);
+        if (!addedPartitions.isEmpty() || !removedPartitions.isEmpty()) {
+            synchronized (assignPartitionsLock) {
+                final Map<TopicIdPartition, UserTopicIdPartition> 
idealUserPartitions = new HashMap<>(assignedUserTopicIdPartitions);
+                addedPartitions.forEach(tpId -> 
idealUserPartitions.putIfAbsent(tpId, newUserTopicIdPartition(tpId)));
+                removedPartitions.forEach(idealUserPartitions::remove);
+                if 
(!idealUserPartitions.equals(assignedUserTopicIdPartitions)) {
+                    assignedUserTopicIdPartitions = 
Collections.unmodifiableMap(idealUserPartitions);
+                    isAssignmentChanged = true;
+                }
+                if (isAssignmentChanged) {
+                    log.debug("Assigned user-topic-partitions: {}", 
assignedUserTopicIdPartitions);
+                    assignPartitionsLock.notifyAll();
+                }
+            }
+        }
     }
 
-    public void removeAssignmentsForPartitions(Set<TopicIdPartition> 
partitions) {
-        updateAssignmentsForPartitions(Collections.emptySet(), partitions);
+    public Optional<Long> receivedOffsetForPartition(final int partition) {
+        return 
Optional.ofNullable(readOffsetsByMetadataPartition.get(partition));
     }
 
-    private void updateAssignmentsForPartitions(Set<TopicIdPartition> 
addedPartitions,
-                                                Set<TopicIdPartition> 
removedPartitions) {
-        log.info("Updating assignments for addedPartitions: {} and 
removedPartition: {}", addedPartitions, removedPartitions);
+    public boolean isMetadataPartitionAssigned(final int partition) {
+        return assignedMetadataPartitions.contains(partition);
+    }
 
-        Objects.requireNonNull(addedPartitions, "addedPartitions must not be 
null");
-        Objects.requireNonNull(removedPartitions, "removedPartitions must not 
be null");
+    public boolean isUserPartitionAssigned(final TopicIdPartition partition) {
+        final UserTopicIdPartition utp = 
assignedUserTopicIdPartitions.get(partition);
+        return utp != null && utp.isAssigned;
+    }
 
-        if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
-            return;
+    @Override
+    public void close() {
+        if (!isClosed) {
+            log.info("Closing the instance");
+            synchronized (assignPartitionsLock) {
+                isClosed = true;
+                
assignedUserTopicIdPartitions.values().forEach(this::markInitialized);
+                consumer.wakeup();
+                assignPartitionsLock.notifyAll();
+            }
         }
+    }
 
-        synchronized (assignPartitionsLock) {
-            Set<TopicIdPartition> updatedReassignedPartitions = new 
HashSet<>(assignedTopicPartitions);
-            updatedReassignedPartitions.addAll(addedPartitions);
-            updatedReassignedPartitions.removeAll(removedPartitions);
-            Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
-            for (TopicIdPartition tp : updatedReassignedPartitions) {
-                
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
-            }
+    public Set<Integer> metadataPartitionsAssigned() {
+        return Collections.unmodifiableSet(assignedMetadataPartitions);
+    }
+
+    private void fetchBeginAndEndOffsets() {
+        try {
+            final Set<TopicPartition> unInitializedPartitions = 
assignedUserTopicIdPartitions.values().stream()
+                .filter(utp -> utp.isAssigned && !utp.isInitialized)
+                .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+                .collect(Collectors.toSet());
+            // Removing the previous offset holder if it exists. During 
reassignment, if the list-offset
+            // call to `earliest` and `latest` offset fails, then we should 
not use the previous values.
+            unInitializedPartitions.forEach(x -> 
offsetHolderByMetadataPartition.remove(x));
+            if (!unInitializedPartitions.isEmpty()) {
+                Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(unInitializedPartitions);
+                Map<TopicPartition, Long> beginOffsets = 
consumer.beginningOffsets(unInitializedPartitions);
+                offsetHolderByMetadataPartition = endOffsets.entrySet()
+                    .stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> new 
BeginAndEndOffsetHolder(beginOffsets.get(e.getKey()), e.getValue())));
 
-            // Clear removed topic partitions from in-memory cache.
-            for (TopicIdPartition removedPartition : removedPartitions) {
-                
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
             }
+            isOffsetsFetchFailed = false;
+        } catch (final RetriableException ex) {
+            // ignore LEADER_NOT_AVAILABLE error, this can happen when the 
partition leader is not yet assigned.
+            isOffsetsFetchFailed = true;
+            lastFailedFetchOffsetsTimestamp = time.milliseconds();
+        }
+    }
 
-            assignedTopicPartitions = 
Collections.unmodifiableSet(updatedReassignedPartitions);
-            log.debug("Assigned topic partitions: {}", 
assignedTopicPartitions);
+    private void maybeFetchBeginAndEndOffsets() {
+        // If the leader for a `__remote_log_metadata` partition is not 
available, then the call to `ListOffsets`
+        // will fail after the default timeout of 1 min. Added a delay of 5 
min in between the retries to prevent the
+        // thread from aggressively fetching the list offsets. During this 
time, the recently reassigned
+        // user-topic-partitions won't be marked as initialized.
+        if (isOffsetsFetchFailed && lastFailedFetchOffsetsTimestamp + 300_000 
< time.milliseconds()) {
+            fetchBeginAndEndOffsets();
+        }

Review Comment:
   When the leader is offline, it may take some time to come up. In that case, 
we want to avoid aggressively fetching the offsets. Otherwise, we will 
frequently call this method and get blocked for 1 min (until the API times 
out). This will prevent us from processing updates for the already assigned 
topic partitions. 
   
   It is basically a tradeoff (when the leader is offline) between whether we 
want to update for the newly assigned topic partitions as soon as possible or 
always be up-to-date for the already assigned topic partitions. We chose the 
second option here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to