abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299873620


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +63,403 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
-
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+    // The timeout for the consumer to poll records from the remote log 
metadata topic.
+    private final long pollTimeoutMs;
     private final Time time;
 
-    // It indicates whether the closing process has been started or not. If it 
is set as true,
-    // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
-    // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-    // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    // It indicates whether the ConsumerTask is closed or not.
+    private volatile boolean isClosed = false;
+    // It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+    // has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+    // ones it is no longer assigned to.
+    // The initial value is set to true to wait for partition assignment on 
the first execution; otherwise thread will
+    // be busy without actually doing anything
+    private volatile boolean hasAssignmentChanged = true;
 
     // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
-    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+    private volatile Set<Integer> assignedMetadataPartitions = 
Collections.emptySet();
 
     // User topic partitions that this broker is a leader/follower for.
-    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+    private volatile Map<TopicIdPartition, UserTopicIdPartition> 
assignedUserTopicIdPartitions = Collections.emptyMap();
+    private volatile Set<TopicIdPartition> 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-    // may or may not have been processed based on the assigned topic 
partitions.
-    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+    private long uninitializedAt;
+    private boolean isAllUserTopicPartitionsInitialized;
 
-    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-    private final long committedOffsetSyncIntervalMs;
-    private CommittedOffsetsFile committedOffsetsFile;
-    private long lastSyncedTimeMs;
+    private Map<TopicPartition, StartAndEndOffsetHolder> 
offsetHolderByMetadataPartition = new HashMap<>();
+    private boolean hasLastOffsetsFetchFailed = false;
+    private long lastFailedFetchOffsetsTimestamp;
+    // The interval between retries to fetch the start and end offsets for the 
metadata partitions after a failed fetch.
+    private final long offsetFetchRetryIntervalMs;
 
-    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
-                        String metadataTopicName,
-                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+    public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
-                        Path committedOffsetsPath,
-                        Time time,
-                        long committedOffsetSyncIntervalMs) {
-        this.consumer = Objects.requireNonNull(consumer);
-        this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+                        Consumer<byte[], byte[]> consumer,
+                        long pollTimeoutMs,
+                        long offsetFetchRetryIntervalMs,
+                        Time time) {
+        this.consumer = consumer;
         this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
         this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+        this.pollTimeoutMs = pollTimeoutMs;
+        this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs;
         this.time = Objects.requireNonNull(time);
-        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-        initializeConsumerAssignment(committedOffsetsPath);
-    }
-
-    private void initializeConsumerAssignment(Path committedOffsetsPath) {
-        try {
-            committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-
-        Map<Integer, Long> committedOffsets = Collections.emptyMap();
-        try {
-            // Load committed offset and assign them in the consumer.
-            committedOffsets = committedOffsetsFile.readEntries();
-        } catch (IOException e) {
-            // Ignore the error and consumer consumes from the earliest offset.
-            log.error("Encountered error while building committed offsets from 
the file. " +
-                              "Consumer will consume from the earliest offset 
for the assigned partitions.", e);
-        }
-
-        if (!committedOffsets.isEmpty()) {
-            // Assign topic partitions from the earlier committed offsets file.
-            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
-            assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
-            Set<TopicPartition> metadataTopicPartitions = 
earlierAssignedPartitions.stream()
-                                                                               
    .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
-                                                                               
    .collect(Collectors.toSet());
-            consumer.assign(metadataTopicPartitions);
-
-            // Seek to the committed offsets
-            for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) 
{
-                log.debug("Updating consumed offset: [{}] for partition [{}]", 
entry.getValue(), entry.getKey());
-                partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
-                consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
-            }
-
-            lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
-        }
+        this.uninitializedAt = time.milliseconds();
     }
 
     @Override
     public void run() {
-        log.info("Started Consumer task thread.");
-        lastSyncedTimeMs = time.milliseconds();
-        try {
-            while (!closing) {
-                maybeWaitForPartitionsAssignment();
+        log.info("Starting consumer task thread.");
+        while (!isClosed) {
+            try {
+                if (hasAssignmentChanged) {
+                    maybeWaitForPartitionAssignments();
+                }
 
                 log.trace("Polling consumer to receive remote log metadata 
topic records");
-                ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
+                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
                 for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                     processConsumerRecord(record);
                 }
-
-                maybeSyncCommittedDataAndOffsets(false);
+                maybeMarkUserPartitionsAsReady();
+            } catch (final WakeupException ex) {
+                // ignore logging the error
+                isClosed = true;
+            } catch (final RetriableException ex) {
+                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
+            } catch (final Exception ex) {
+                isClosed = true;
+                log.error("Error occurred while processing the records", ex);
             }
-        } catch (Exception e) {
-            log.error("Error occurred in consumer task, close:[{}]", closing, 
e);
-        } finally {
-            maybeSyncCommittedDataAndOffsets(true);
-            closeConsumer();
-            log.info("Exiting from consumer task thread");
         }
+        try {
+            consumer.close(Duration.ofSeconds(30));
+        } catch (final Exception e) {
+            log.error("Error encountered while closing the consumer", e);
+        }
+        log.info("Exited from consumer task thread");
     }
 
     private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
-        // Taking assignPartitionsLock here as updateAssignmentsForPartitions 
changes assignedTopicPartitions
-        // and also calls 
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for 
the removed
-        // partitions.
-        RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
-        synchronized (assignPartitionsLock) {
-            if 
(assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
-                
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
-            } else {
-                log.debug("This event {} is skipped as the topic partition is 
not assigned for this instance.", remoteLogMetadata);
-            }
-            log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
-            partitionToConsumedOffsets.put(record.partition(), 
record.offset());
+        final RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
+        if (shouldProcess(remoteLogMetadata, record.offset())) {
+            
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+            
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), 
record.offset());
+        } else {
+            log.debug("The event {} is skipped because it is either already 
processed or not assigned to this consumer", remoteLogMetadata);
         }
+        log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
+        readOffsetsByMetadataPartition.put(record.partition(), 
record.offset());
     }
 
-    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
-        // Return immediately if there is no consumption from last time.
-        boolean noConsumedOffsetUpdates = 
partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets);
-        if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
-            log.debug("Skip syncing committed offsets, 
noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, 
forceSync);
+    private boolean shouldProcess(final RemoteLogMetadata metadata, final long 
recordOffset) {
+        final TopicIdPartition tpId = metadata.topicIdPartition();
+        final Long readOffset = readOffsetsByUserTopicPartition.get(tpId);
+        return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && 
(readOffset == null || readOffset < recordOffset);
+    }
+
+    private void maybeMarkUserPartitionsAsReady() {
+        if (isAllUserTopicPartitionsInitialized) {
             return;
         }
-
-        try {
-            // Need to take lock on assignPartitionsLock as 
assignedTopicPartitions might
-            // get updated by other threads.
-            synchronized (assignPartitionsLock) {
-                for (TopicIdPartition topicIdPartition : 
assignedTopicPartitions) {
-                    int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
-                    Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
-                    if (offset != null) {
-                        
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, 
metadataPartition, offset);
+        maybeFetchStartAndEndOffsets();
+        boolean isAllInitialized = true;
+        for (final UserTopicIdPartition utp : 
assignedUserTopicIdPartitions.values()) {
+            if (utp.isAssigned && !utp.isInitialized) {
+                final Integer metadataPartition = utp.metadataPartition;
+                final StartAndEndOffsetHolder holder = 
offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition));
+                // The offset-holder can be null, when the recent assignment 
wasn't picked up by the consumer.
+                if (holder != null) {
+                    final Long readOffset = 
readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L);
+                    // 1) The end-offset was fetched only once during 
reassignment. The metadata-partition can receive
+                    // new stream of records, so the consumer can read records 
more than the last-fetched end-offset.
+                    // 2) When the internal topic becomes empty due to breach 
by size/time/start-offset, then there
+                    // are no records to read.
+                    if (readOffset + 1 >= holder.endOffset || 
holder.endOffset.equals(holder.startOffset)) {
+                        markInitialized(utp);
                     } else {
-                        log.debug("Skipping sync-up of the 
remote-log-metadata-file for partition: [{}] , with remote log metadata 
partition{}, and no offset",
-                                topicIdPartition, metadataPartition);
+                        log.debug("The user-topic-partition {} could not be 
marked initialized since the read-offset is {} " +
+                                "but the end-offset is {} for the 
metadata-partition {}", utp, readOffset, holder.endOffset,
+                            metadataPartition);
                     }
+                } else {
+                    log.debug("The offset-holder is null for the 
metadata-partition {}. The consumer may not have picked" +
+                            " up the recent assignment", metadataPartition);
                 }
-
-                // Write partitionToConsumedOffsets into committed offsets 
file as we do not want to process them again
-                // in case of restarts.
-                committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
-                lastSyncedPartitionToConsumedOffsets = new 
HashMap<>(partitionToConsumedOffsets);
             }
-
-            lastSyncedTimeMs = time.milliseconds();
-        } catch (IOException e) {
-            throw new KafkaException("Error encountered while writing 
committed offsets to a local file", e);
+            isAllInitialized = isAllInitialized && utp.isInitialized;

Review Comment:
   That should be ok. @kamalcph do you have any concerns? Since this is 
single-threaded, I don't see a race happening. The function where we mark utp 
as assigned (`maybeWaitForPartitionAssignments`) and this method are executed 
by the same thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to