jeqo commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1288620620


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();

Review Comment:
   Similar here. We can initialize this and uninitializedAt in constructor, and 
pass SystemTime on test setup.



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;

Review Comment:
   Is this really needed? 
   I can see value is overwritten on test setup to 10L, but I can manage to 
test successfully with default value 100L.
   



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();
 
-    // It indicates whether the closing process has been started or not. If it 
is set as true,
-    // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
-    // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-    // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    // It indicates whether the ConsumerTask is closed or not.
+    private volatile boolean isClosed = false;
+    // It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+    // has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+    // ones it is no longer assigned to.
+    private volatile boolean isAssignmentChanged = true;
 
     // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
-    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+    private volatile Set<Integer> assignedMetadataPartitions = 
Collections.emptySet();
 
     // User topic partitions that this broker is a leader/follower for.
-    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+    private volatile Map<TopicIdPartition, UserTopicIdPartition> 
assignedUserTopicIdPartitions = Collections.emptyMap();
+    private volatile Set<TopicIdPartition> 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-    // may or may not have been processed based on the assigned topic 
partitions.
-    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+    private long uninitializedAt = time.milliseconds();
+    private boolean isAllUserTopicPartitionsInitialized;
 
-    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-    private final long committedOffsetSyncIntervalMs;
-    private CommittedOffsetsFile committedOffsetsFile;
-    private long lastSyncedTimeMs;
+    private Map<TopicPartition, BeginAndEndOffsetHolder> 
offsetHolderByMetadataPartition = new HashMap<>();
+    private boolean isOffsetsFetchFailed = false;
+    private long lastFailedFetchOffsetsTimestamp;
 
-    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
-                        String metadataTopicName,
-                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+    public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
-                        Path committedOffsetsPath,
-                        Time time,
-                        long committedOffsetSyncIntervalMs) {
-        this.consumer = Objects.requireNonNull(consumer);
-        this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+                        Function<Optional<String>, Consumer<byte[], byte[]>> 
consumerSupplier) {

Review Comment:
   Why do we need a function here instead of passing the consumer as before? 
Input value seem to be ignored.



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##########
@@ -357,7 +357,9 @@ public void configure(Map<String, ?> configs) {
             log.info("Started configuring topic-based RLMM with configs: {}", 
configs);
 
             rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
-            rlmmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
+            if (rlmmTopicPartitioner == null) {
+                rlmmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());

Review Comment:
   nit:
   ```suggestion
                   rlmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
   ```



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();
 
-    // It indicates whether the closing process has been started or not. If it 
is set as true,
-    // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
-    // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-    // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    // It indicates whether the ConsumerTask is closed or not.
+    private volatile boolean isClosed = false;
+    // It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+    // has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+    // ones it is no longer assigned to.
+    private volatile boolean isAssignmentChanged = true;
 
     // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
-    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+    private volatile Set<Integer> assignedMetadataPartitions = 
Collections.emptySet();
 
     // User topic partitions that this broker is a leader/follower for.
-    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+    private volatile Map<TopicIdPartition, UserTopicIdPartition> 
assignedUserTopicIdPartitions = Collections.emptyMap();
+    private volatile Set<TopicIdPartition> 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-    // may or may not have been processed based on the assigned topic 
partitions.
-    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+    private long uninitializedAt = time.milliseconds();
+    private boolean isAllUserTopicPartitionsInitialized;
 
-    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-    private final long committedOffsetSyncIntervalMs;
-    private CommittedOffsetsFile committedOffsetsFile;
-    private long lastSyncedTimeMs;
+    private Map<TopicPartition, BeginAndEndOffsetHolder> 
offsetHolderByMetadataPartition = new HashMap<>();
+    private boolean isOffsetsFetchFailed = false;
+    private long lastFailedFetchOffsetsTimestamp;
 
-    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
-                        String metadataTopicName,
-                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+    public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
-                        Path committedOffsetsPath,
-                        Time time,
-                        long committedOffsetSyncIntervalMs) {
-        this.consumer = Objects.requireNonNull(consumer);
-        this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+                        Function<Optional<String>, Consumer<byte[], byte[]>> 
consumerSupplier) {
+        this.consumer = consumerSupplier.apply(Optional.empty());
         this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
         this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-        this.time = Objects.requireNonNull(time);
-        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-        initializeConsumerAssignment(committedOffsetsPath);
-    }
-
-    private void initializeConsumerAssignment(Path committedOffsetsPath) {
-        try {
-            committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-
-        Map<Integer, Long> committedOffsets = Collections.emptyMap();
-        try {
-            // Load committed offset and assign them in the consumer.
-            committedOffsets = committedOffsetsFile.readEntries();
-        } catch (IOException e) {
-            // Ignore the error and consumer consumes from the earliest offset.
-            log.error("Encountered error while building committed offsets from 
the file. " +
-                              "Consumer will consume from the earliest offset 
for the assigned partitions.", e);
-        }
-
-        if (!committedOffsets.isEmpty()) {
-            // Assign topic partitions from the earlier committed offsets file.
-            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
-            assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
-            Set<TopicPartition> metadataTopicPartitions = 
earlierAssignedPartitions.stream()
-                                                                               
    .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
-                                                                               
    .collect(Collectors.toSet());
-            consumer.assign(metadataTopicPartitions);
-
-            // Seek to the committed offsets
-            for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) 
{
-                log.debug("Updating consumed offset: [{}] for partition [{}]", 
entry.getValue(), entry.getKey());
-                partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
-                consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
-            }
-
-            lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
-        }
     }
 
     @Override
     public void run() {
-        log.info("Started Consumer task thread.");
-        lastSyncedTimeMs = time.milliseconds();
-        try {
-            while (!closing) {
-                maybeWaitForPartitionsAssignment();
+        log.info("Starting consumer task thread.");
+        while (!isClosed) {
+            try {
+                if (isAssignmentChanged) {
+                    maybeWaitForPartitionsAssignment();
+                }
 
                 log.trace("Polling consumer to receive remote log metadata 
topic records");
-                ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
+                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollIntervalMs));
                 for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                     processConsumerRecord(record);
                 }
-
-                maybeSyncCommittedDataAndOffsets(false);
+                maybeMarkUserPartitionsAsReady();
+            } catch (final WakeupException ex) {
+                // ignore logging the error
+                isClosed = true;
+            } catch (final RetriableException ex) {
+                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
+            } catch (final Exception ex) {
+                isClosed = true;
+                log.error("Error occurred while processing the records", ex);
             }
-        } catch (Exception e) {
-            log.error("Error occurred in consumer task, close:[{}]", closing, 
e);
-        } finally {
-            maybeSyncCommittedDataAndOffsets(true);
-            closeConsumer();
-            log.info("Exiting from consumer task thread");
         }
+        try {
+            consumer.close(Duration.ofSeconds(30));
+        } catch (final Exception e) {
+            log.error("Error encountered while closing the consumer", e);
+        }
+        log.info("Exited from consumer task thread");
     }
 
     private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
-        // Taking assignPartitionsLock here as updateAssignmentsForPartitions 
changes assignedTopicPartitions
-        // and also calls 
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for 
the removed
-        // partitions.
-        RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
-        synchronized (assignPartitionsLock) {
-            if 
(assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
-                
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
-            } else {
-                log.debug("This event {} is skipped as the topic partition is 
not assigned for this instance.", remoteLogMetadata);
-            }
-            log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
-            partitionToConsumedOffsets.put(record.partition(), 
record.offset());
+        final RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
+        if (canProcess(remoteLogMetadata, record.offset())) {
+            
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+            
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), 
record.offset());
+        } else {
+            log.debug("The event {} is skipped because it is either already 
processed or not assigned to this consumer", remoteLogMetadata);
         }
+        log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
+        readOffsetsByMetadataPartition.put(record.partition(), 
record.offset());
+    }
+
+    private boolean canProcess(final RemoteLogMetadata metadata, final long 
recordOffset) {
+        final TopicIdPartition tpId = metadata.topicIdPartition();
+        final Long readOffset = readOffsetsByUserTopicPartition.get(tpId);
+        return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && 
(readOffset == null || readOffset < recordOffset);
     }
 
-    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
-        // Return immediately if there is no consumption from last time.
-        boolean noConsumedOffsetUpdates = 
partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets);
-        if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
-            log.debug("Skip syncing committed offsets, 
noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, 
forceSync);
+    private void maybeMarkUserPartitionsAsReady() {
+        if (isAllUserTopicPartitionsInitialized) {
             return;
         }
-
-        try {
-            // Need to take lock on assignPartitionsLock as 
assignedTopicPartitions might
-            // get updated by other threads.
-            synchronized (assignPartitionsLock) {
-                for (TopicIdPartition topicIdPartition : 
assignedTopicPartitions) {
-                    int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
-                    Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
-                    if (offset != null) {
-                        
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, 
metadataPartition, offset);
+        maybeFetchBeginAndEndOffsets();
+        boolean isAllInitialized = true;
+        for (final UserTopicIdPartition utp : 
assignedUserTopicIdPartitions.values()) {
+            if (utp.isAssigned && !utp.isInitialized) {
+                final Integer metadataPartition = utp.metadataPartition;
+                final BeginAndEndOffsetHolder holder = 
offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition));
+                // The offset-holder can be null, when the recent assignment 
wasn't picked up by the consumer.
+                if (holder != null) {
+                    final Long readOffset = 
readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L);
+                    // 1) The end-offset was fetched only once during 
reassignment. The metadata-partition can receive
+                    // new stream of records, so the consumer can read records 
more than the last-fetched end-offset.
+                    // 2) When the internal topic becomes empty due to breach 
by size/time/start-offset, then there
+                    // are no records to read.
+                    if (readOffset + 1 >= holder.endOffset || 
holder.endOffset.equals(holder.beginOffset)) {
+                        markInitialized(utp);
                     } else {
-                        log.debug("Skipping sync-up of the 
remote-log-metadata-file for partition: [{}] , with remote log metadata 
partition{}, and no offset",
-                                topicIdPartition, metadataPartition);
+                        log.debug("The user-topic-partition {} could not be 
marked initialized since the read-offset is {} " +
+                                "but the end-offset is {} for the 
metadata-partition {}", utp, readOffset, holder.endOffset,
+                            metadataPartition);
                     }
+                } else {
+                    log.debug("The offset-holder is null for the 
metadata-partition {}. The consumer may not have picked" +
+                            " up the recent assignment", metadataPartition);
                 }
-
-                // Write partitionToConsumedOffsets into committed offsets 
file as we do not want to process them again
-                // in case of restarts.
-                committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
-                lastSyncedPartitionToConsumedOffsets = new 
HashMap<>(partitionToConsumedOffsets);
             }
-
-            lastSyncedTimeMs = time.milliseconds();
-        } catch (IOException e) {
-            throw new KafkaException("Error encountered while writing 
committed offsets to a local file", e);
+            isAllInitialized = isAllInitialized && utp.isInitialized;
         }
-    }
-
-    private void closeConsumer() {
-        log.info("Closing the consumer instance");
-        try {
-            consumer.close(Duration.ofSeconds(30));
-        } catch (Exception e) {
-            log.error("Error encountered while closing the consumer", e);
+        if (isAllInitialized) {
+            log.info("Initialized for all the {} assigned user-partitions 
mapped to the {} meta-partitions in {} ms",
+                assignedUserTopicIdPartitions.size(), 
assignedMetadataPartitions.size(),
+                time.milliseconds() - uninitializedAt);
         }
+        isAllUserTopicPartitionsInitialized = isAllInitialized;
     }
 
-    private void maybeWaitForPartitionsAssignment() {
-        Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+    void maybeWaitForPartitionsAssignment() throws InterruptedException {
+        // Snapshots of the metadata-partition and user-topic-partition are 
used to reduce the scope of the
+        // synchronization block.
+        // 1) LEADER_AND_ISR and STOP_REPLICA requests adds / removes the 
user-topic-partitions from the request
+        //    handler threads. Those threads should not be blocked for a long 
time, therefore scope of the
+        //    synchronization block is reduced to bare minimum.
+        // 2) Note that the consumer#position, consumer#seekToBeginning, 
consumer#seekToEnd and the other consumer APIs
+        //    response times are un-predictable. Those should not be kept in 
the synchronization block.
+        final Set<Integer> metadataPartitionSnapshot = new HashSet<>();
+        final Set<UserTopicIdPartition> assignedUserTopicIdPartitionsSnapshot 
= new HashSet<>();
         synchronized (assignPartitionsLock) {
-            // If it is closing, return immediately. This should be inside the 
assignPartitionsLock as the closing is updated
-            // in close() method with in the same lock to avoid any race 
conditions.
-            if (closing) {
-                return;
+            while (!isClosed && assignedUserTopicIdPartitions.isEmpty()) {
+                log.debug("Waiting for remote log metadata partitions to be 
assigned");
+                assignPartitionsLock.wait();
             }
-
-            while (assignedMetaPartitions.isEmpty()) {
-                // If no partitions are assigned, wait until they are assigned.
-                log.debug("Waiting for assigned remote log metadata 
partitions..");
-                try {
-                    // No timeout is set here, as it is always notified. Even 
when it is closed, the race can happen
-                    // between the thread calling this method and the thread 
calling close(). We should have a check
-                    // for closing as that might have been set and notified 
with assignPartitionsLock by `close`
-                    // method.
-                    assignPartitionsLock.wait();
-
-                    if (closing) {
-                        return;
-                    }
-                } catch (InterruptedException e) {
-                    throw new KafkaException(e);
-                }
-            }
-
-            if (assignPartitions) {
-                assignedMetaPartitionsSnapshot = new 
HashSet<>(assignedMetaPartitions);
-                // Removing unassigned meta partitions from 
partitionToConsumedOffsets and partitionToCommittedOffsets
-                partitionToConsumedOffsets.entrySet().removeIf(entry -> 
!assignedMetaPartitions.contains(entry.getKey()));
-
-                assignPartitions = false;
+            if (!isClosed && isAssignmentChanged) {
+                assignedUserTopicIdPartitions.values().forEach(utp -> {
+                    metadataPartitionSnapshot.add(utp.metadataPartition);
+                    assignedUserTopicIdPartitionsSnapshot.add(utp);
+                });
+                isAssignmentChanged = false;
             }
         }
-
-        if (!assignedMetaPartitionsSnapshot.isEmpty()) {
-            executeReassignment(assignedMetaPartitionsSnapshot);
+        if (!metadataPartitionSnapshot.isEmpty()) {
+            final Set<TopicPartition> remoteLogPartitions = 
toRemoteLogPartitions(metadataPartitionSnapshot);
+            consumer.assign(remoteLogPartitions);
+            this.assignedMetadataPartitions = 
Collections.unmodifiableSet(metadataPartitionSnapshot);
+            // for newly assigned user-partitions, read from the beginning of 
the corresponding metadata partition
+            final Set<TopicPartition> seekToBeginOffsetPartitions = 
assignedUserTopicIdPartitionsSnapshot
+                .stream()
+                .filter(utp -> !utp.isAssigned)
+                .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+                .collect(Collectors.toSet());
+            consumer.seekToBeginning(seekToBeginOffsetPartitions);
+            // for other metadata partitions, read from the offset where the 
processing left last time.
+            remoteLogPartitions.stream()
+                .filter(tp -> !seekToBeginOffsetPartitions.contains(tp) &&
+                    readOffsetsByMetadataPartition.containsKey(tp.partition()))
+                .forEach(tp -> consumer.seek(tp, 
readOffsetsByMetadataPartition.get(tp.partition())));
+            // mark all the user-topic-partitions as assigned to the consumer.
+            assignedUserTopicIdPartitionsSnapshot.forEach(utp -> {
+                if (!utp.isAssigned) {
+                    // Note that there can be a race between `remove` and 
`add` partition assignment. Calling the
+                    // `maybeLoadPartition` here again to be sure that the 
partition gets loaded on the handler.
+                    
remotePartitionMetadataEventHandler.maybeLoadPartition(utp.topicIdPartition);
+                    utp.isAssigned = true;
+                }
+            });
+            processedAssignmentOfUserTopicIdPartitions = 
assignedUserTopicIdPartitionsSnapshot.stream()
+                .map(utp -> utp.topicIdPartition).collect(Collectors.toSet());
+            
clearResourcesForUnassignedUserTopicPartitions(assignedUserTopicIdPartitionsSnapshot);
+            isAllUserTopicPartitionsInitialized = false;
+            uninitializedAt = time.milliseconds();
+            fetchBeginAndEndOffsets();
         }
     }
 
-    private void executeReassignment(Set<Integer> 
assignedMetaPartitionsSnapshot) {
-        Set<TopicPartition> assignedMetaTopicPartitions =
-                assignedMetaPartitionsSnapshot.stream()
-                                              .map(partitionNum -> new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
-                                              .collect(Collectors.toSet());
-        log.info("Reassigning partitions to consumer task [{}]", 
assignedMetaTopicPartitions);
-        consumer.assign(assignedMetaTopicPartitions);
+    private void 
clearResourcesForUnassignedUserTopicPartitions(Set<UserTopicIdPartition> 
assignedUTPs) {
+        Set<TopicIdPartition> assignedPartitions = assignedUTPs.stream()
+            .map(utp -> utp.topicIdPartition).collect(Collectors.toSet());
+        // Note that there can be previously assigned user-topic-partitions 
where no records are there to read
+        // (eg) none of the segments for a partition were uploaded. Those 
partition resources won't be cleared.
+        // It can be fixed later when required since they are empty resources.
+        Set<TopicIdPartition> unassignedPartitions = 
readOffsetsByUserTopicPartition.keySet()
+            .stream()
+            .filter(e -> !assignedPartitions.contains(e))
+            .collect(Collectors.toSet());
+        unassignedPartitions.forEach(unassignedPartition -> {
+            
remotePartitionMetadataEventHandler.clearTopicPartition(unassignedPartition);
+            readOffsetsByUserTopicPartition.remove(unassignedPartition);
+        });
+        log.info("Unassigned user-topic-partitions: {}", 
unassignedPartitions.size());
     }
 
-    public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
-        updateAssignmentsForPartitions(partitions, Collections.emptySet());
+    public void addAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+        updateAssignments(Objects.requireNonNull(partitions), 
Collections.emptySet());
     }
 
-    public void removeAssignmentsForPartitions(Set<TopicIdPartition> 
partitions) {
-        updateAssignmentsForPartitions(Collections.emptySet(), partitions);
+    public void removeAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+        updateAssignments(Collections.emptySet(), 
Objects.requireNonNull(partitions));
     }
 
-    private void updateAssignmentsForPartitions(Set<TopicIdPartition> 
addedPartitions,
-                                                Set<TopicIdPartition> 
removedPartitions) {
-        log.info("Updating assignments for addedPartitions: {} and 
removedPartition: {}", addedPartitions, removedPartitions);
+    private void updateAssignments(final Set<TopicIdPartition> addedPartitions,
+                                   final Set<TopicIdPartition> 
removedPartitions) {
+        log.info("Updating assignments for partitions added: {} and removed: 
{}", addedPartitions, removedPartitions);
+        if (!addedPartitions.isEmpty() || !removedPartitions.isEmpty()) {
+            synchronized (assignPartitionsLock) {
+                // Make a copy of the existing assignments and update the copy.
+                final Map<TopicIdPartition, UserTopicIdPartition> 
updatedUserPartitions = new HashMap<>(assignedUserTopicIdPartitions);
+                addedPartitions.forEach(tpId -> 
updatedUserPartitions.putIfAbsent(tpId, newUserTopicIdPartition(tpId)));
+                removedPartitions.forEach(updatedUserPartitions::remove);
+                if 
(!updatedUserPartitions.equals(assignedUserTopicIdPartitions)) {
+                    assignedUserTopicIdPartitions = 
Collections.unmodifiableMap(updatedUserPartitions);
+                    isAssignmentChanged = true;
+                    log.debug("Assigned user-topic-partitions: {}", 
assignedUserTopicIdPartitions);
+                    assignPartitionsLock.notifyAll();
+                }
+            }
+        }
+    }
 
-        Objects.requireNonNull(addedPartitions, "addedPartitions must not be 
null");
-        Objects.requireNonNull(removedPartitions, "removedPartitions must not 
be null");
+    public Optional<Long> receivedOffsetForPartition(final int partition) {
+        return 
Optional.ofNullable(readOffsetsByMetadataPartition.get(partition));
+    }
 
-        if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
-            return;
-        }
+    public boolean isMetadataPartitionAssigned(final int partition) {
+        return assignedMetadataPartitions.contains(partition);
+    }
 
-        synchronized (assignPartitionsLock) {
-            Set<TopicIdPartition> updatedReassignedPartitions = new 
HashSet<>(assignedTopicPartitions);
-            updatedReassignedPartitions.addAll(addedPartitions);
-            updatedReassignedPartitions.removeAll(removedPartitions);
-            Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
-            for (TopicIdPartition tp : updatedReassignedPartitions) {
-                
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
-            }
+    public boolean isUserPartitionAssigned(final TopicIdPartition partition) {
+        final UserTopicIdPartition utp = 
assignedUserTopicIdPartitions.get(partition);
+        return utp != null && utp.isAssigned;
+    }
 
-            // Clear removed topic partitions from in-memory cache.
-            for (TopicIdPartition removedPartition : removedPartitions) {
-                
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
+    @Override
+    public void close() {
+        if (!isClosed) {
+            log.info("Closing the instance");
+            synchronized (assignPartitionsLock) {
+                isClosed = true;
+                
assignedUserTopicIdPartitions.values().forEach(this::markInitialized);
+                consumer.wakeup();
+                assignPartitionsLock.notifyAll();
             }
+        }
+    }
 
-            assignedTopicPartitions = 
Collections.unmodifiableSet(updatedReassignedPartitions);
-            log.debug("Assigned topic partitions: {}", 
assignedTopicPartitions);
+    public Set<Integer> metadataPartitionsAssigned() {
+        return Collections.unmodifiableSet(assignedMetadataPartitions);
+    }
 
-            if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) 
{
-                assignedMetaPartitions = 
Collections.unmodifiableSet(updatedAssignedMetaPartitions);
-                log.debug("Assigned metadata topic partitions: {}", 
assignedMetaPartitions);
+    private void fetchBeginAndEndOffsets() {
+        try {
+            final Set<TopicPartition> unInitializedPartitions = 
assignedUserTopicIdPartitions.values().stream()

Review Comment:
   nit 
   ```suggestion
               final Set<TopicPartition> uninitializedPartitions = 
assignedUserTopicIdPartitions.values().stream()
   ```



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java:
##########
@@ -185,4 +190,15 @@ public void maybeLoadPartition(TopicIdPartition partition) 
{
             topicIdPartition -> new 
FileBasedRemoteLogMetadataCache(topicIdPartition, 
partitionLogDirectory(topicIdPartition.topicPartition())));
     }

Review Comment:
   nit: `maybeLoadPartition`  should have `@Override` annotation now.



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();
 
-    // It indicates whether the closing process has been started or not. If it 
is set as true,
-    // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
-    // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-    // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    // It indicates whether the ConsumerTask is closed or not.
+    private volatile boolean isClosed = false;
+    // It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+    // has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+    // ones it is no longer assigned to.
+    private volatile boolean isAssignmentChanged = true;
 
     // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
-    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+    private volatile Set<Integer> assignedMetadataPartitions = 
Collections.emptySet();
 
     // User topic partitions that this broker is a leader/follower for.
-    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+    private volatile Map<TopicIdPartition, UserTopicIdPartition> 
assignedUserTopicIdPartitions = Collections.emptyMap();
+    private volatile Set<TopicIdPartition> 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-    // may or may not have been processed based on the assigned topic 
partitions.
-    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+    private long uninitializedAt = time.milliseconds();
+    private boolean isAllUserTopicPartitionsInitialized;
 
-    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-    private final long committedOffsetSyncIntervalMs;
-    private CommittedOffsetsFile committedOffsetsFile;
-    private long lastSyncedTimeMs;
+    private Map<TopicPartition, BeginAndEndOffsetHolder> 
offsetHolderByMetadataPartition = new HashMap<>();
+    private boolean isOffsetsFetchFailed = false;
+    private long lastFailedFetchOffsetsTimestamp;
 
-    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
-                        String metadataTopicName,
-                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+    public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
-                        Path committedOffsetsPath,

Review Comment:
   Haven't look to deep into this, but could you elaborate on why this file is 
not needed anymore? 
   
   btw, `COMMITTED_OFFSETS_FILE_NAME` can be removed on ConsumerManager.



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##########
@@ -357,7 +357,9 @@ public void configure(Map<String, ?> configs) {
             log.info("Started configuring topic-based RLMM with configs: {}", 
configs);
 
             rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
-            rlmmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
+            if (rlmmTopicPartitioner == null) {

Review Comment:
   Can't we just set the topic partitioner _after_ configure in test harness 
and remove this validation here?



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();
 
-    // It indicates whether the closing process has been started or not. If it 
is set as true,
-    // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
-    // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-    // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    // It indicates whether the ConsumerTask is closed or not.
+    private volatile boolean isClosed = false;
+    // It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+    // has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+    // ones it is no longer assigned to.
+    private volatile boolean isAssignmentChanged = true;

Review Comment:
   nit
   ```suggestion
       private volatile boolean hasAssignmentChanged = true;
   ```



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##########
@@ -515,6 +517,11 @@ public void startConsumerThread() {
         }
     }

Review Comment:
   btw, `startConsumerThread` it's unused, can be removed?



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();
 
-    // It indicates whether the closing process has been started or not. If it 
is set as true,
-    // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
-    // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-    // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    // It indicates whether the ConsumerTask is closed or not.
+    private volatile boolean isClosed = false;
+    // It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+    // has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+    // ones it is no longer assigned to.
+    private volatile boolean isAssignmentChanged = true;
 
     // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
-    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+    private volatile Set<Integer> assignedMetadataPartitions = 
Collections.emptySet();
 
     // User topic partitions that this broker is a leader/follower for.
-    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+    private volatile Map<TopicIdPartition, UserTopicIdPartition> 
assignedUserTopicIdPartitions = Collections.emptyMap();
+    private volatile Set<TopicIdPartition> 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-    // may or may not have been processed based on the assigned topic 
partitions.
-    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+    private long uninitializedAt = time.milliseconds();
+    private boolean isAllUserTopicPartitionsInitialized;
 
-    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-    private final long committedOffsetSyncIntervalMs;
-    private CommittedOffsetsFile committedOffsetsFile;
-    private long lastSyncedTimeMs;
+    private Map<TopicPartition, BeginAndEndOffsetHolder> 
offsetHolderByMetadataPartition = new HashMap<>();
+    private boolean isOffsetsFetchFailed = false;
+    private long lastFailedFetchOffsetsTimestamp;
 
-    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
-                        String metadataTopicName,
-                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+    public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
-                        Path committedOffsetsPath,
-                        Time time,
-                        long committedOffsetSyncIntervalMs) {
-        this.consumer = Objects.requireNonNull(consumer);
-        this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+                        Function<Optional<String>, Consumer<byte[], byte[]>> 
consumerSupplier) {
+        this.consumer = consumerSupplier.apply(Optional.empty());
         this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
         this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-        this.time = Objects.requireNonNull(time);
-        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-        initializeConsumerAssignment(committedOffsetsPath);
-    }
-
-    private void initializeConsumerAssignment(Path committedOffsetsPath) {
-        try {
-            committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-
-        Map<Integer, Long> committedOffsets = Collections.emptyMap();
-        try {
-            // Load committed offset and assign them in the consumer.
-            committedOffsets = committedOffsetsFile.readEntries();
-        } catch (IOException e) {
-            // Ignore the error and consumer consumes from the earliest offset.
-            log.error("Encountered error while building committed offsets from 
the file. " +
-                              "Consumer will consume from the earliest offset 
for the assigned partitions.", e);
-        }
-
-        if (!committedOffsets.isEmpty()) {
-            // Assign topic partitions from the earlier committed offsets file.
-            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
-            assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
-            Set<TopicPartition> metadataTopicPartitions = 
earlierAssignedPartitions.stream()
-                                                                               
    .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
-                                                                               
    .collect(Collectors.toSet());
-            consumer.assign(metadataTopicPartitions);
-
-            // Seek to the committed offsets
-            for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) 
{
-                log.debug("Updating consumed offset: [{}] for partition [{}]", 
entry.getValue(), entry.getKey());
-                partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
-                consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
-            }
-
-            lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
-        }
     }
 
     @Override
     public void run() {
-        log.info("Started Consumer task thread.");
-        lastSyncedTimeMs = time.milliseconds();
-        try {
-            while (!closing) {
-                maybeWaitForPartitionsAssignment();
+        log.info("Starting consumer task thread.");
+        while (!isClosed) {
+            try {
+                if (isAssignmentChanged) {
+                    maybeWaitForPartitionsAssignment();
+                }
 
                 log.trace("Polling consumer to receive remote log metadata 
topic records");
-                ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
+                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollIntervalMs));
                 for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                     processConsumerRecord(record);
                 }
-
-                maybeSyncCommittedDataAndOffsets(false);
+                maybeMarkUserPartitionsAsReady();
+            } catch (final WakeupException ex) {
+                // ignore logging the error
+                isClosed = true;
+            } catch (final RetriableException ex) {
+                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
+            } catch (final Exception ex) {
+                isClosed = true;
+                log.error("Error occurred while processing the records", ex);
             }
-        } catch (Exception e) {
-            log.error("Error occurred in consumer task, close:[{}]", closing, 
e);
-        } finally {
-            maybeSyncCommittedDataAndOffsets(true);
-            closeConsumer();
-            log.info("Exiting from consumer task thread");
         }
+        try {
+            consumer.close(Duration.ofSeconds(30));
+        } catch (final Exception e) {
+            log.error("Error encountered while closing the consumer", e);
+        }
+        log.info("Exited from consumer task thread");
     }
 
     private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
-        // Taking assignPartitionsLock here as updateAssignmentsForPartitions 
changes assignedTopicPartitions
-        // and also calls 
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for 
the removed
-        // partitions.
-        RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
-        synchronized (assignPartitionsLock) {
-            if 
(assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
-                
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
-            } else {
-                log.debug("This event {} is skipped as the topic partition is 
not assigned for this instance.", remoteLogMetadata);
-            }
-            log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
-            partitionToConsumedOffsets.put(record.partition(), 
record.offset());
+        final RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
+        if (canProcess(remoteLogMetadata, record.offset())) {
+            
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+            
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), 
record.offset());
+        } else {
+            log.debug("The event {} is skipped because it is either already 
processed or not assigned to this consumer", remoteLogMetadata);
         }
+        log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
+        readOffsetsByMetadataPartition.put(record.partition(), 
record.offset());
+    }
+
+    private boolean canProcess(final RemoteLogMetadata metadata, final long 
recordOffset) {
+        final TopicIdPartition tpId = metadata.topicIdPartition();
+        final Long readOffset = readOffsetsByUserTopicPartition.get(tpId);
+        return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && 
(readOffset == null || readOffset < recordOffset);
     }
 
-    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
-        // Return immediately if there is no consumption from last time.
-        boolean noConsumedOffsetUpdates = 
partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets);
-        if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
-            log.debug("Skip syncing committed offsets, 
noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, 
forceSync);
+    private void maybeMarkUserPartitionsAsReady() {
+        if (isAllUserTopicPartitionsInitialized) {
             return;
         }
-
-        try {
-            // Need to take lock on assignPartitionsLock as 
assignedTopicPartitions might
-            // get updated by other threads.
-            synchronized (assignPartitionsLock) {
-                for (TopicIdPartition topicIdPartition : 
assignedTopicPartitions) {
-                    int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
-                    Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
-                    if (offset != null) {
-                        
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, 
metadataPartition, offset);
+        maybeFetchBeginAndEndOffsets();
+        boolean isAllInitialized = true;
+        for (final UserTopicIdPartition utp : 
assignedUserTopicIdPartitions.values()) {
+            if (utp.isAssigned && !utp.isInitialized) {
+                final Integer metadataPartition = utp.metadataPartition;
+                final BeginAndEndOffsetHolder holder = 
offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition));
+                // The offset-holder can be null, when the recent assignment 
wasn't picked up by the consumer.
+                if (holder != null) {
+                    final Long readOffset = 
readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L);
+                    // 1) The end-offset was fetched only once during 
reassignment. The metadata-partition can receive
+                    // new stream of records, so the consumer can read records 
more than the last-fetched end-offset.
+                    // 2) When the internal topic becomes empty due to breach 
by size/time/start-offset, then there
+                    // are no records to read.
+                    if (readOffset + 1 >= holder.endOffset || 
holder.endOffset.equals(holder.beginOffset)) {
+                        markInitialized(utp);
                     } else {
-                        log.debug("Skipping sync-up of the 
remote-log-metadata-file for partition: [{}] , with remote log metadata 
partition{}, and no offset",
-                                topicIdPartition, metadataPartition);
+                        log.debug("The user-topic-partition {} could not be 
marked initialized since the read-offset is {} " +
+                                "but the end-offset is {} for the 
metadata-partition {}", utp, readOffset, holder.endOffset,
+                            metadataPartition);
                     }
+                } else {
+                    log.debug("The offset-holder is null for the 
metadata-partition {}. The consumer may not have picked" +
+                            " up the recent assignment", metadataPartition);
                 }
-
-                // Write partitionToConsumedOffsets into committed offsets 
file as we do not want to process them again
-                // in case of restarts.
-                committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
-                lastSyncedPartitionToConsumedOffsets = new 
HashMap<>(partitionToConsumedOffsets);
             }
-
-            lastSyncedTimeMs = time.milliseconds();
-        } catch (IOException e) {
-            throw new KafkaException("Error encountered while writing 
committed offsets to a local file", e);
+            isAllInitialized = isAllInitialized && utp.isInitialized;
         }
-    }
-
-    private void closeConsumer() {
-        log.info("Closing the consumer instance");
-        try {
-            consumer.close(Duration.ofSeconds(30));
-        } catch (Exception e) {
-            log.error("Error encountered while closing the consumer", e);
+        if (isAllInitialized) {
+            log.info("Initialized for all the {} assigned user-partitions 
mapped to the {} meta-partitions in {} ms",
+                assignedUserTopicIdPartitions.size(), 
assignedMetadataPartitions.size(),
+                time.milliseconds() - uninitializedAt);
         }
+        isAllUserTopicPartitionsInitialized = isAllInitialized;
     }
 
-    private void maybeWaitForPartitionsAssignment() {
-        Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+    void maybeWaitForPartitionsAssignment() throws InterruptedException {
+        // Snapshots of the metadata-partition and user-topic-partition are 
used to reduce the scope of the
+        // synchronization block.
+        // 1) LEADER_AND_ISR and STOP_REPLICA requests adds / removes the 
user-topic-partitions from the request
+        //    handler threads. Those threads should not be blocked for a long 
time, therefore scope of the
+        //    synchronization block is reduced to bare minimum.
+        // 2) Note that the consumer#position, consumer#seekToBeginning, 
consumer#seekToEnd and the other consumer APIs
+        //    response times are un-predictable. Those should not be kept in 
the synchronization block.
+        final Set<Integer> metadataPartitionSnapshot = new HashSet<>();
+        final Set<UserTopicIdPartition> assignedUserTopicIdPartitionsSnapshot 
= new HashSet<>();
         synchronized (assignPartitionsLock) {
-            // If it is closing, return immediately. This should be inside the 
assignPartitionsLock as the closing is updated
-            // in close() method with in the same lock to avoid any race 
conditions.
-            if (closing) {
-                return;
+            while (!isClosed && assignedUserTopicIdPartitions.isEmpty()) {
+                log.debug("Waiting for remote log metadata partitions to be 
assigned");
+                assignPartitionsLock.wait();
             }
-
-            while (assignedMetaPartitions.isEmpty()) {
-                // If no partitions are assigned, wait until they are assigned.
-                log.debug("Waiting for assigned remote log metadata 
partitions..");
-                try {
-                    // No timeout is set here, as it is always notified. Even 
when it is closed, the race can happen
-                    // between the thread calling this method and the thread 
calling close(). We should have a check
-                    // for closing as that might have been set and notified 
with assignPartitionsLock by `close`
-                    // method.
-                    assignPartitionsLock.wait();
-
-                    if (closing) {
-                        return;
-                    }
-                } catch (InterruptedException e) {
-                    throw new KafkaException(e);
-                }
-            }
-
-            if (assignPartitions) {
-                assignedMetaPartitionsSnapshot = new 
HashSet<>(assignedMetaPartitions);
-                // Removing unassigned meta partitions from 
partitionToConsumedOffsets and partitionToCommittedOffsets
-                partitionToConsumedOffsets.entrySet().removeIf(entry -> 
!assignedMetaPartitions.contains(entry.getKey()));
-
-                assignPartitions = false;
+            if (!isClosed && isAssignmentChanged) {
+                assignedUserTopicIdPartitions.values().forEach(utp -> {
+                    metadataPartitionSnapshot.add(utp.metadataPartition);
+                    assignedUserTopicIdPartitionsSnapshot.add(utp);
+                });
+                isAssignmentChanged = false;
             }
         }
-
-        if (!assignedMetaPartitionsSnapshot.isEmpty()) {
-            executeReassignment(assignedMetaPartitionsSnapshot);
+        if (!metadataPartitionSnapshot.isEmpty()) {
+            final Set<TopicPartition> remoteLogPartitions = 
toRemoteLogPartitions(metadataPartitionSnapshot);
+            consumer.assign(remoteLogPartitions);
+            this.assignedMetadataPartitions = 
Collections.unmodifiableSet(metadataPartitionSnapshot);
+            // for newly assigned user-partitions, read from the beginning of 
the corresponding metadata partition
+            final Set<TopicPartition> seekToBeginOffsetPartitions = 
assignedUserTopicIdPartitionsSnapshot
+                .stream()
+                .filter(utp -> !utp.isAssigned)
+                .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+                .collect(Collectors.toSet());
+            consumer.seekToBeginning(seekToBeginOffsetPartitions);
+            // for other metadata partitions, read from the offset where the 
processing left last time.
+            remoteLogPartitions.stream()
+                .filter(tp -> !seekToBeginOffsetPartitions.contains(tp) &&
+                    readOffsetsByMetadataPartition.containsKey(tp.partition()))
+                .forEach(tp -> consumer.seek(tp, 
readOffsetsByMetadataPartition.get(tp.partition())));
+            // mark all the user-topic-partitions as assigned to the consumer.
+            assignedUserTopicIdPartitionsSnapshot.forEach(utp -> {
+                if (!utp.isAssigned) {
+                    // Note that there can be a race between `remove` and 
`add` partition assignment. Calling the
+                    // `maybeLoadPartition` here again to be sure that the 
partition gets loaded on the handler.
+                    
remotePartitionMetadataEventHandler.maybeLoadPartition(utp.topicIdPartition);
+                    utp.isAssigned = true;
+                }
+            });
+            processedAssignmentOfUserTopicIdPartitions = 
assignedUserTopicIdPartitionsSnapshot.stream()
+                .map(utp -> utp.topicIdPartition).collect(Collectors.toSet());
+            
clearResourcesForUnassignedUserTopicPartitions(assignedUserTopicIdPartitionsSnapshot);
+            isAllUserTopicPartitionsInitialized = false;
+            uninitializedAt = time.milliseconds();
+            fetchBeginAndEndOffsets();
         }
     }
 
-    private void executeReassignment(Set<Integer> 
assignedMetaPartitionsSnapshot) {
-        Set<TopicPartition> assignedMetaTopicPartitions =
-                assignedMetaPartitionsSnapshot.stream()
-                                              .map(partitionNum -> new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
-                                              .collect(Collectors.toSet());
-        log.info("Reassigning partitions to consumer task [{}]", 
assignedMetaTopicPartitions);
-        consumer.assign(assignedMetaTopicPartitions);
+    private void 
clearResourcesForUnassignedUserTopicPartitions(Set<UserTopicIdPartition> 
assignedUTPs) {
+        Set<TopicIdPartition> assignedPartitions = assignedUTPs.stream()
+            .map(utp -> utp.topicIdPartition).collect(Collectors.toSet());
+        // Note that there can be previously assigned user-topic-partitions 
where no records are there to read
+        // (eg) none of the segments for a partition were uploaded. Those 
partition resources won't be cleared.
+        // It can be fixed later when required since they are empty resources.
+        Set<TopicIdPartition> unassignedPartitions = 
readOffsetsByUserTopicPartition.keySet()
+            .stream()
+            .filter(e -> !assignedPartitions.contains(e))
+            .collect(Collectors.toSet());
+        unassignedPartitions.forEach(unassignedPartition -> {
+            
remotePartitionMetadataEventHandler.clearTopicPartition(unassignedPartition);
+            readOffsetsByUserTopicPartition.remove(unassignedPartition);
+        });
+        log.info("Unassigned user-topic-partitions: {}", 
unassignedPartitions.size());
     }
 
-    public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
-        updateAssignmentsForPartitions(partitions, Collections.emptySet());
+    public void addAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+        updateAssignments(Objects.requireNonNull(partitions), 
Collections.emptySet());
     }
 
-    public void removeAssignmentsForPartitions(Set<TopicIdPartition> 
partitions) {
-        updateAssignmentsForPartitions(Collections.emptySet(), partitions);
+    public void removeAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+        updateAssignments(Collections.emptySet(), 
Objects.requireNonNull(partitions));
     }
 
-    private void updateAssignmentsForPartitions(Set<TopicIdPartition> 
addedPartitions,
-                                                Set<TopicIdPartition> 
removedPartitions) {
-        log.info("Updating assignments for addedPartitions: {} and 
removedPartition: {}", addedPartitions, removedPartitions);
+    private void updateAssignments(final Set<TopicIdPartition> addedPartitions,
+                                   final Set<TopicIdPartition> 
removedPartitions) {
+        log.info("Updating assignments for partitions added: {} and removed: 
{}", addedPartitions, removedPartitions);
+        if (!addedPartitions.isEmpty() || !removedPartitions.isEmpty()) {
+            synchronized (assignPartitionsLock) {
+                // Make a copy of the existing assignments and update the copy.
+                final Map<TopicIdPartition, UserTopicIdPartition> 
updatedUserPartitions = new HashMap<>(assignedUserTopicIdPartitions);
+                addedPartitions.forEach(tpId -> 
updatedUserPartitions.putIfAbsent(tpId, newUserTopicIdPartition(tpId)));
+                removedPartitions.forEach(updatedUserPartitions::remove);
+                if 
(!updatedUserPartitions.equals(assignedUserTopicIdPartitions)) {
+                    assignedUserTopicIdPartitions = 
Collections.unmodifiableMap(updatedUserPartitions);
+                    isAssignmentChanged = true;
+                    log.debug("Assigned user-topic-partitions: {}", 
assignedUserTopicIdPartitions);
+                    assignPartitionsLock.notifyAll();
+                }
+            }
+        }
+    }
 
-        Objects.requireNonNull(addedPartitions, "addedPartitions must not be 
null");
-        Objects.requireNonNull(removedPartitions, "removedPartitions must not 
be null");
+    public Optional<Long> receivedOffsetForPartition(final int partition) {
+        return 
Optional.ofNullable(readOffsetsByMetadataPartition.get(partition));
+    }
 
-        if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
-            return;
-        }
+    public boolean isMetadataPartitionAssigned(final int partition) {
+        return assignedMetadataPartitions.contains(partition);
+    }
 
-        synchronized (assignPartitionsLock) {
-            Set<TopicIdPartition> updatedReassignedPartitions = new 
HashSet<>(assignedTopicPartitions);
-            updatedReassignedPartitions.addAll(addedPartitions);
-            updatedReassignedPartitions.removeAll(removedPartitions);
-            Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
-            for (TopicIdPartition tp : updatedReassignedPartitions) {
-                
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
-            }
+    public boolean isUserPartitionAssigned(final TopicIdPartition partition) {
+        final UserTopicIdPartition utp = 
assignedUserTopicIdPartitions.get(partition);
+        return utp != null && utp.isAssigned;
+    }
 
-            // Clear removed topic partitions from in-memory cache.
-            for (TopicIdPartition removedPartition : removedPartitions) {
-                
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
+    @Override
+    public void close() {
+        if (!isClosed) {
+            log.info("Closing the instance");
+            synchronized (assignPartitionsLock) {
+                isClosed = true;
+                
assignedUserTopicIdPartitions.values().forEach(this::markInitialized);
+                consumer.wakeup();
+                assignPartitionsLock.notifyAll();
             }
+        }
+    }
 
-            assignedTopicPartitions = 
Collections.unmodifiableSet(updatedReassignedPartitions);
-            log.debug("Assigned topic partitions: {}", 
assignedTopicPartitions);
+    public Set<Integer> metadataPartitionsAssigned() {
+        return Collections.unmodifiableSet(assignedMetadataPartitions);
+    }
 
-            if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) 
{
-                assignedMetaPartitions = 
Collections.unmodifiableSet(updatedAssignedMetaPartitions);
-                log.debug("Assigned metadata topic partitions: {}", 
assignedMetaPartitions);
+    private void fetchBeginAndEndOffsets() {
+        try {
+            final Set<TopicPartition> unInitializedPartitions = 
assignedUserTopicIdPartitions.values().stream()
+                .filter(utp -> utp.isAssigned && !utp.isInitialized)
+                .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+                .collect(Collectors.toSet());
+            // Removing the previous offset holder if it exists. During 
reassignment, if the list-offset
+            // call to `earliest` and `latest` offset fails, then we should 
not use the previous values.
+            unInitializedPartitions.forEach(x -> 
offsetHolderByMetadataPartition.remove(x));
+            if (!unInitializedPartitions.isEmpty()) {
+                Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(unInitializedPartitions);
+                Map<TopicPartition, Long> beginOffsets = 
consumer.beginningOffsets(unInitializedPartitions);
+                offsetHolderByMetadataPartition = endOffsets.entrySet()
+                    .stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> new 
BeginAndEndOffsetHolder(beginOffsets.get(e.getKey()), e.getValue())));
 
-                assignPartitions = true;
-                assignPartitionsLock.notifyAll();
-            } else {
-                log.debug("No change in assigned metadata topic partitions: 
{}", assignedMetaPartitions);
             }
+            isOffsetsFetchFailed = false;
+        } catch (final RetriableException ex) {
+            // ignore LEADER_NOT_AVAILABLE error, this can happen when the 
partition leader is not yet assigned.
+            isOffsetsFetchFailed = true;
+            lastFailedFetchOffsetsTimestamp = time.milliseconds();
         }
     }
 
-    public Optional<Long> receivedOffsetForPartition(int partition) {
-        return Optional.ofNullable(partitionToConsumedOffsets.get(partition));
+    private void maybeFetchBeginAndEndOffsets() {
+        // If the leader for a `__remote_log_metadata` partition is not 
available, then the call to `ListOffsets`
+        // will fail after the default timeout of 1 min. Added a delay of 5 
min in between the retries to prevent the
+        // thread from aggressively fetching the list offsets. During this 
time, the recently reassigned
+        // user-topic-partitions won't be marked as initialized.
+        if (isOffsetsFetchFailed && lastFailedFetchOffsetsTimestamp + 300_000 
< time.milliseconds()) {
+            fetchBeginAndEndOffsets();
+        }
     }
 
-    public boolean isPartitionAssigned(int partition) {
-        return assignedMetaPartitions.contains(partition);
+    private UserTopicIdPartition newUserTopicIdPartition(final 
TopicIdPartition tpId) {
+        return new UserTopicIdPartition(tpId, 
topicPartitioner.metadataPartition(tpId));
     }
 
-    public void close() {
-        if (!closing) {
-            synchronized (assignPartitionsLock) {
-                // Closing should be updated only after acquiring the lock to 
avoid race in
-                // maybeWaitForPartitionsAssignment() where it waits on 
assignPartitionsLock. It should not wait
-                // if the closing is already set.
-                closing = true;
-                consumer.wakeup();
-                assignPartitionsLock.notifyAll();
-            }
+    private void markInitialized(final UserTopicIdPartition utp) {
+        // Silently not initialize the utp
+        if (!utp.isAssigned) {
+            log.warn("Tried to initialize a UTP: {} that was not yet 
assigned!", utp);
+            return;
+        }
+        if (!utp.isInitialized) {
+            
remotePartitionMetadataEventHandler.markInitialized(utp.topicIdPartition);
+            utp.isInitialized = true;
         }
     }
 
-    public Set<Integer> metadataPartitionsAssigned() {
-        return Collections.unmodifiableSet(assignedMetaPartitions);
+    static Set<TopicPartition> toRemoteLogPartitions(final Set<Integer> 
partitions) {
+        return partitions.stream()
+            .map(ConsumerTask::toRemoteLogPartition)
+            .collect(Collectors.toSet());
+    }
+
+    static TopicPartition toRemoteLogPartition(int partition) {
+        return new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partition);
+    }
+
+    static class UserTopicIdPartition {
+        private final TopicIdPartition topicIdPartition;
+        private final Integer metadataPartition;
+        // The `utp` will be initialized once it reads all the existing events 
from the remote log metadata topic.
+        boolean isInitialized;
+        // denotes whether this `utp` is assigned to the consumer
+        boolean isAssigned;
+
+        /**
+         * UserTopicIdPartition denotes the user topic-partitions for which 
this broker acts as a leader/follower of.
+         *
+         * @param tpId               the unique topic partition identifier
+         * @param metadataPartition  the remote log metadata partition mapped 
for this user-topic-partition.
+         */
+        public UserTopicIdPartition(final TopicIdPartition tpId, final Integer 
metadataPartition) {
+            this.topicIdPartition = Objects.requireNonNull(tpId);
+            this.metadataPartition = Objects.requireNonNull(metadataPartition);
+            this.isInitialized = false;
+            this.isAssigned = false;
+        }
+
+        @Override
+        public String toString() {
+            return "UserTopicIdPartition{" +
+                "topicIdPartition=" + topicIdPartition +
+                ", metadataPartition=" + metadataPartition +
+                ", isInitialized=" + isInitialized +
+                ", isAssigned=" + isAssigned +
+                '}';
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            UserTopicIdPartition that = (UserTopicIdPartition) o;
+            return topicIdPartition.equals(that.topicIdPartition) && 
metadataPartition.equals(that.metadataPartition);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(topicIdPartition, metadataPartition);
+        }
+    }
+
+    static class BeginAndEndOffsetHolder {
+        Long beginOffset;

Review Comment:
   nit: RemoteLogSegmentMetadata uses start|endOffset.
   ```suggestion
       static class StartAndEndOffsets {
           Long startOffset;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to