jeqo commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1288620620
########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ########## @@ -64,302 +65,393 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); - private static final long POLL_INTERVAL_MS = 100L; + static long pollIntervalMs = 100L; private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); - private final KafkaConsumer<byte[], byte[]> consumer; - private final String metadataTopicName; + private final Consumer<byte[], byte[]> consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; - private final Time time; + private final Time time = new SystemTime(); Review Comment: Similar here. We can initialize this and uninitializedAt in constructor, and pass SystemTime on test setup. ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ########## @@ -64,302 +65,393 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); - private static final long POLL_INTERVAL_MS = 100L; + static long pollIntervalMs = 100L; Review Comment: Is this really needed? I can see value is overwritten on test setup to 10L, but I can manage to test successfully with default value 100L. ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ########## @@ -64,302 +65,393 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); - private static final long POLL_INTERVAL_MS = 100L; + static long pollIntervalMs = 100L; private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); - private final KafkaConsumer<byte[], byte[]> consumer; - private final String metadataTopicName; + private final Consumer<byte[], byte[]> consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; - private final Time time; + private final Time time = new SystemTime(); - // It indicates whether the closing process has been started or not. If it is set as true, - // consumer will stop consuming messages, and it will not allow partition assignments to be updated. - private volatile boolean closing = false; - - // It indicates whether the consumer needs to assign the partitions or not. This is set when it is - // determined that the consumer needs to be assigned with the updated partitions. - private volatile boolean assignPartitions = false; + // It indicates whether the ConsumerTask is closed or not. + private volatile boolean isClosed = false; + // It indicates whether the user topic partition assignment to the consumer has changed or not. If the assignment + // has changed, the consumer will eventually start tracking the newly assigned partitions and stop tracking the + // ones it is no longer assigned to. + private volatile boolean isAssignmentChanged = true; // It represents a lock for any operations related to the assignedTopicPartitions. private final Object assignPartitionsLock = new Object(); // Remote log metadata topic partitions that consumer is assigned to. - private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet(); + private volatile Set<Integer> assignedMetadataPartitions = Collections.emptySet(); // User topic partitions that this broker is a leader/follower for. - private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet(); + private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Collections.emptyMap(); + private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Collections.emptySet(); - // Map of remote log metadata topic partition to consumed offsets. Received consumer records - // may or may not have been processed based on the assigned topic partitions. - private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>(); + private long uninitializedAt = time.milliseconds(); + private boolean isAllUserTopicPartitionsInitialized; - // Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile. - private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = Collections.emptyMap(); + // Map of remote log metadata topic partition to consumed offsets. + private final Map<Integer, Long> readOffsetsByMetadataPartition = new ConcurrentHashMap<>(); + private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition = new HashMap<>(); - private final long committedOffsetSyncIntervalMs; - private CommittedOffsetsFile committedOffsetsFile; - private long lastSyncedTimeMs; + private Map<TopicPartition, BeginAndEndOffsetHolder> offsetHolderByMetadataPartition = new HashMap<>(); + private boolean isOffsetsFetchFailed = false; + private long lastFailedFetchOffsetsTimestamp; - public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer, - String metadataTopicName, - RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, + public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner topicPartitioner, - Path committedOffsetsPath, - Time time, - long committedOffsetSyncIntervalMs) { - this.consumer = Objects.requireNonNull(consumer); - this.metadataTopicName = Objects.requireNonNull(metadataTopicName); + Function<Optional<String>, Consumer<byte[], byte[]>> consumerSupplier) { Review Comment: Why do we need a function here instead of passing the consumer as before? Input value seem to be ignored. ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ########## @@ -357,7 +357,9 @@ public void configure(Map<String, ?> configs) { log.info("Started configuring topic-based RLMM with configs: {}", configs); rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs); - rlmmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount()); + if (rlmmTopicPartitioner == null) { + rlmmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount()); Review Comment: nit: ```suggestion rlmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount()); ``` ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ########## @@ -64,302 +65,393 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); - private static final long POLL_INTERVAL_MS = 100L; + static long pollIntervalMs = 100L; private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); - private final KafkaConsumer<byte[], byte[]> consumer; - private final String metadataTopicName; + private final Consumer<byte[], byte[]> consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; - private final Time time; + private final Time time = new SystemTime(); - // It indicates whether the closing process has been started or not. If it is set as true, - // consumer will stop consuming messages, and it will not allow partition assignments to be updated. - private volatile boolean closing = false; - - // It indicates whether the consumer needs to assign the partitions or not. This is set when it is - // determined that the consumer needs to be assigned with the updated partitions. - private volatile boolean assignPartitions = false; + // It indicates whether the ConsumerTask is closed or not. + private volatile boolean isClosed = false; + // It indicates whether the user topic partition assignment to the consumer has changed or not. If the assignment + // has changed, the consumer will eventually start tracking the newly assigned partitions and stop tracking the + // ones it is no longer assigned to. + private volatile boolean isAssignmentChanged = true; // It represents a lock for any operations related to the assignedTopicPartitions. private final Object assignPartitionsLock = new Object(); // Remote log metadata topic partitions that consumer is assigned to. - private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet(); + private volatile Set<Integer> assignedMetadataPartitions = Collections.emptySet(); // User topic partitions that this broker is a leader/follower for. - private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet(); + private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Collections.emptyMap(); + private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Collections.emptySet(); - // Map of remote log metadata topic partition to consumed offsets. Received consumer records - // may or may not have been processed based on the assigned topic partitions. - private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>(); + private long uninitializedAt = time.milliseconds(); + private boolean isAllUserTopicPartitionsInitialized; - // Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile. - private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = Collections.emptyMap(); + // Map of remote log metadata topic partition to consumed offsets. + private final Map<Integer, Long> readOffsetsByMetadataPartition = new ConcurrentHashMap<>(); + private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition = new HashMap<>(); - private final long committedOffsetSyncIntervalMs; - private CommittedOffsetsFile committedOffsetsFile; - private long lastSyncedTimeMs; + private Map<TopicPartition, BeginAndEndOffsetHolder> offsetHolderByMetadataPartition = new HashMap<>(); + private boolean isOffsetsFetchFailed = false; + private long lastFailedFetchOffsetsTimestamp; - public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer, - String metadataTopicName, - RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, + public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner topicPartitioner, - Path committedOffsetsPath, - Time time, - long committedOffsetSyncIntervalMs) { - this.consumer = Objects.requireNonNull(consumer); - this.metadataTopicName = Objects.requireNonNull(metadataTopicName); + Function<Optional<String>, Consumer<byte[], byte[]>> consumerSupplier) { + this.consumer = consumerSupplier.apply(Optional.empty()); this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler); this.topicPartitioner = Objects.requireNonNull(topicPartitioner); - this.time = Objects.requireNonNull(time); - this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs; - - initializeConsumerAssignment(committedOffsetsPath); - } - - private void initializeConsumerAssignment(Path committedOffsetsPath) { - try { - committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile()); - } catch (IOException e) { - throw new KafkaException(e); - } - - Map<Integer, Long> committedOffsets = Collections.emptyMap(); - try { - // Load committed offset and assign them in the consumer. - committedOffsets = committedOffsetsFile.readEntries(); - } catch (IOException e) { - // Ignore the error and consumer consumes from the earliest offset. - log.error("Encountered error while building committed offsets from the file. " + - "Consumer will consume from the earliest offset for the assigned partitions.", e); - } - - if (!committedOffsets.isEmpty()) { - // Assign topic partitions from the earlier committed offsets file. - Set<Integer> earlierAssignedPartitions = committedOffsets.keySet(); - assignedMetaPartitions = Collections.unmodifiableSet(earlierAssignedPartitions); - Set<TopicPartition> metadataTopicPartitions = earlierAssignedPartitions.stream() - .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x)) - .collect(Collectors.toSet()); - consumer.assign(metadataTopicPartitions); - - // Seek to the committed offsets - for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) { - log.debug("Updating consumed offset: [{}] for partition [{}]", entry.getValue(), entry.getKey()); - partitionToConsumedOffsets.put(entry.getKey(), entry.getValue()); - consumer.seek(new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), entry.getValue()); - } - - lastSyncedPartitionToConsumedOffsets = Collections.unmodifiableMap(committedOffsets); - } } @Override public void run() { - log.info("Started Consumer task thread."); - lastSyncedTimeMs = time.milliseconds(); - try { - while (!closing) { - maybeWaitForPartitionsAssignment(); + log.info("Starting consumer task thread."); + while (!isClosed) { + try { + if (isAssignmentChanged) { + maybeWaitForPartitionsAssignment(); + } log.trace("Polling consumer to receive remote log metadata topic records"); - ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS)); + final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(pollIntervalMs)); for (ConsumerRecord<byte[], byte[]> record : consumerRecords) { processConsumerRecord(record); } - - maybeSyncCommittedDataAndOffsets(false); + maybeMarkUserPartitionsAsReady(); + } catch (final WakeupException ex) { + // ignore logging the error + isClosed = true; + } catch (final RetriableException ex) { + log.warn("Retriable error occurred while processing the records. Retrying...", ex); + } catch (final Exception ex) { + isClosed = true; + log.error("Error occurred while processing the records", ex); } - } catch (Exception e) { - log.error("Error occurred in consumer task, close:[{}]", closing, e); - } finally { - maybeSyncCommittedDataAndOffsets(true); - closeConsumer(); - log.info("Exiting from consumer task thread"); } + try { + consumer.close(Duration.ofSeconds(30)); + } catch (final Exception e) { + log.error("Error encountered while closing the consumer", e); + } + log.info("Exited from consumer task thread"); } private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) { - // Taking assignPartitionsLock here as updateAssignmentsForPartitions changes assignedTopicPartitions - // and also calls remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for the removed - // partitions. - RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value()); - synchronized (assignPartitionsLock) { - if (assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) { - remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata); - } else { - log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata); - } - log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition()); - partitionToConsumedOffsets.put(record.partition(), record.offset()); + final RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value()); + if (canProcess(remoteLogMetadata, record.offset())) { + remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata); + readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), record.offset()); + } else { + log.debug("The event {} is skipped because it is either already processed or not assigned to this consumer", remoteLogMetadata); } + log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition()); + readOffsetsByMetadataPartition.put(record.partition(), record.offset()); + } + + private boolean canProcess(final RemoteLogMetadata metadata, final long recordOffset) { + final TopicIdPartition tpId = metadata.topicIdPartition(); + final Long readOffset = readOffsetsByUserTopicPartition.get(tpId); + return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && (readOffset == null || readOffset < recordOffset); } - private void maybeSyncCommittedDataAndOffsets(boolean forceSync) { - // Return immediately if there is no consumption from last time. - boolean noConsumedOffsetUpdates = partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets); - if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) { - log.debug("Skip syncing committed offsets, noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, forceSync); + private void maybeMarkUserPartitionsAsReady() { + if (isAllUserTopicPartitionsInitialized) { return; } - - try { - // Need to take lock on assignPartitionsLock as assignedTopicPartitions might - // get updated by other threads. - synchronized (assignPartitionsLock) { - for (TopicIdPartition topicIdPartition : assignedTopicPartitions) { - int metadataPartition = topicPartitioner.metadataPartition(topicIdPartition); - Long offset = partitionToConsumedOffsets.get(metadataPartition); - if (offset != null) { - remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, metadataPartition, offset); + maybeFetchBeginAndEndOffsets(); + boolean isAllInitialized = true; + for (final UserTopicIdPartition utp : assignedUserTopicIdPartitions.values()) { + if (utp.isAssigned && !utp.isInitialized) { + final Integer metadataPartition = utp.metadataPartition; + final BeginAndEndOffsetHolder holder = offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition)); + // The offset-holder can be null, when the recent assignment wasn't picked up by the consumer. + if (holder != null) { + final Long readOffset = readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L); + // 1) The end-offset was fetched only once during reassignment. The metadata-partition can receive + // new stream of records, so the consumer can read records more than the last-fetched end-offset. + // 2) When the internal topic becomes empty due to breach by size/time/start-offset, then there + // are no records to read. + if (readOffset + 1 >= holder.endOffset || holder.endOffset.equals(holder.beginOffset)) { + markInitialized(utp); } else { - log.debug("Skipping sync-up of the remote-log-metadata-file for partition: [{}] , with remote log metadata partition{}, and no offset", - topicIdPartition, metadataPartition); + log.debug("The user-topic-partition {} could not be marked initialized since the read-offset is {} " + + "but the end-offset is {} for the metadata-partition {}", utp, readOffset, holder.endOffset, + metadataPartition); } + } else { + log.debug("The offset-holder is null for the metadata-partition {}. The consumer may not have picked" + + " up the recent assignment", metadataPartition); } - - // Write partitionToConsumedOffsets into committed offsets file as we do not want to process them again - // in case of restarts. - committedOffsetsFile.writeEntries(partitionToConsumedOffsets); - lastSyncedPartitionToConsumedOffsets = new HashMap<>(partitionToConsumedOffsets); } - - lastSyncedTimeMs = time.milliseconds(); - } catch (IOException e) { - throw new KafkaException("Error encountered while writing committed offsets to a local file", e); + isAllInitialized = isAllInitialized && utp.isInitialized; } - } - - private void closeConsumer() { - log.info("Closing the consumer instance"); - try { - consumer.close(Duration.ofSeconds(30)); - } catch (Exception e) { - log.error("Error encountered while closing the consumer", e); + if (isAllInitialized) { + log.info("Initialized for all the {} assigned user-partitions mapped to the {} meta-partitions in {} ms", + assignedUserTopicIdPartitions.size(), assignedMetadataPartitions.size(), + time.milliseconds() - uninitializedAt); } + isAllUserTopicPartitionsInitialized = isAllInitialized; } - private void maybeWaitForPartitionsAssignment() { - Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet(); + void maybeWaitForPartitionsAssignment() throws InterruptedException { + // Snapshots of the metadata-partition and user-topic-partition are used to reduce the scope of the + // synchronization block. + // 1) LEADER_AND_ISR and STOP_REPLICA requests adds / removes the user-topic-partitions from the request + // handler threads. Those threads should not be blocked for a long time, therefore scope of the + // synchronization block is reduced to bare minimum. + // 2) Note that the consumer#position, consumer#seekToBeginning, consumer#seekToEnd and the other consumer APIs + // response times are un-predictable. Those should not be kept in the synchronization block. + final Set<Integer> metadataPartitionSnapshot = new HashSet<>(); + final Set<UserTopicIdPartition> assignedUserTopicIdPartitionsSnapshot = new HashSet<>(); synchronized (assignPartitionsLock) { - // If it is closing, return immediately. This should be inside the assignPartitionsLock as the closing is updated - // in close() method with in the same lock to avoid any race conditions. - if (closing) { - return; + while (!isClosed && assignedUserTopicIdPartitions.isEmpty()) { + log.debug("Waiting for remote log metadata partitions to be assigned"); + assignPartitionsLock.wait(); } - - while (assignedMetaPartitions.isEmpty()) { - // If no partitions are assigned, wait until they are assigned. - log.debug("Waiting for assigned remote log metadata partitions.."); - try { - // No timeout is set here, as it is always notified. Even when it is closed, the race can happen - // between the thread calling this method and the thread calling close(). We should have a check - // for closing as that might have been set and notified with assignPartitionsLock by `close` - // method. - assignPartitionsLock.wait(); - - if (closing) { - return; - } - } catch (InterruptedException e) { - throw new KafkaException(e); - } - } - - if (assignPartitions) { - assignedMetaPartitionsSnapshot = new HashSet<>(assignedMetaPartitions); - // Removing unassigned meta partitions from partitionToConsumedOffsets and partitionToCommittedOffsets - partitionToConsumedOffsets.entrySet().removeIf(entry -> !assignedMetaPartitions.contains(entry.getKey())); - - assignPartitions = false; + if (!isClosed && isAssignmentChanged) { + assignedUserTopicIdPartitions.values().forEach(utp -> { + metadataPartitionSnapshot.add(utp.metadataPartition); + assignedUserTopicIdPartitionsSnapshot.add(utp); + }); + isAssignmentChanged = false; } } - - if (!assignedMetaPartitionsSnapshot.isEmpty()) { - executeReassignment(assignedMetaPartitionsSnapshot); + if (!metadataPartitionSnapshot.isEmpty()) { + final Set<TopicPartition> remoteLogPartitions = toRemoteLogPartitions(metadataPartitionSnapshot); + consumer.assign(remoteLogPartitions); + this.assignedMetadataPartitions = Collections.unmodifiableSet(metadataPartitionSnapshot); + // for newly assigned user-partitions, read from the beginning of the corresponding metadata partition + final Set<TopicPartition> seekToBeginOffsetPartitions = assignedUserTopicIdPartitionsSnapshot + .stream() + .filter(utp -> !utp.isAssigned) + .map(utp -> toRemoteLogPartition(utp.metadataPartition)) + .collect(Collectors.toSet()); + consumer.seekToBeginning(seekToBeginOffsetPartitions); + // for other metadata partitions, read from the offset where the processing left last time. + remoteLogPartitions.stream() + .filter(tp -> !seekToBeginOffsetPartitions.contains(tp) && + readOffsetsByMetadataPartition.containsKey(tp.partition())) + .forEach(tp -> consumer.seek(tp, readOffsetsByMetadataPartition.get(tp.partition()))); + // mark all the user-topic-partitions as assigned to the consumer. + assignedUserTopicIdPartitionsSnapshot.forEach(utp -> { + if (!utp.isAssigned) { + // Note that there can be a race between `remove` and `add` partition assignment. Calling the + // `maybeLoadPartition` here again to be sure that the partition gets loaded on the handler. + remotePartitionMetadataEventHandler.maybeLoadPartition(utp.topicIdPartition); + utp.isAssigned = true; + } + }); + processedAssignmentOfUserTopicIdPartitions = assignedUserTopicIdPartitionsSnapshot.stream() + .map(utp -> utp.topicIdPartition).collect(Collectors.toSet()); + clearResourcesForUnassignedUserTopicPartitions(assignedUserTopicIdPartitionsSnapshot); + isAllUserTopicPartitionsInitialized = false; + uninitializedAt = time.milliseconds(); + fetchBeginAndEndOffsets(); } } - private void executeReassignment(Set<Integer> assignedMetaPartitionsSnapshot) { - Set<TopicPartition> assignedMetaTopicPartitions = - assignedMetaPartitionsSnapshot.stream() - .map(partitionNum -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum)) - .collect(Collectors.toSet()); - log.info("Reassigning partitions to consumer task [{}]", assignedMetaTopicPartitions); - consumer.assign(assignedMetaTopicPartitions); + private void clearResourcesForUnassignedUserTopicPartitions(Set<UserTopicIdPartition> assignedUTPs) { + Set<TopicIdPartition> assignedPartitions = assignedUTPs.stream() + .map(utp -> utp.topicIdPartition).collect(Collectors.toSet()); + // Note that there can be previously assigned user-topic-partitions where no records are there to read + // (eg) none of the segments for a partition were uploaded. Those partition resources won't be cleared. + // It can be fixed later when required since they are empty resources. + Set<TopicIdPartition> unassignedPartitions = readOffsetsByUserTopicPartition.keySet() + .stream() + .filter(e -> !assignedPartitions.contains(e)) + .collect(Collectors.toSet()); + unassignedPartitions.forEach(unassignedPartition -> { + remotePartitionMetadataEventHandler.clearTopicPartition(unassignedPartition); + readOffsetsByUserTopicPartition.remove(unassignedPartition); + }); + log.info("Unassigned user-topic-partitions: {}", unassignedPartitions.size()); } - public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) { - updateAssignmentsForPartitions(partitions, Collections.emptySet()); + public void addAssignmentsForPartitions(final Set<TopicIdPartition> partitions) { + updateAssignments(Objects.requireNonNull(partitions), Collections.emptySet()); } - public void removeAssignmentsForPartitions(Set<TopicIdPartition> partitions) { - updateAssignmentsForPartitions(Collections.emptySet(), partitions); + public void removeAssignmentsForPartitions(final Set<TopicIdPartition> partitions) { + updateAssignments(Collections.emptySet(), Objects.requireNonNull(partitions)); } - private void updateAssignmentsForPartitions(Set<TopicIdPartition> addedPartitions, - Set<TopicIdPartition> removedPartitions) { - log.info("Updating assignments for addedPartitions: {} and removedPartition: {}", addedPartitions, removedPartitions); + private void updateAssignments(final Set<TopicIdPartition> addedPartitions, + final Set<TopicIdPartition> removedPartitions) { + log.info("Updating assignments for partitions added: {} and removed: {}", addedPartitions, removedPartitions); + if (!addedPartitions.isEmpty() || !removedPartitions.isEmpty()) { + synchronized (assignPartitionsLock) { + // Make a copy of the existing assignments and update the copy. + final Map<TopicIdPartition, UserTopicIdPartition> updatedUserPartitions = new HashMap<>(assignedUserTopicIdPartitions); + addedPartitions.forEach(tpId -> updatedUserPartitions.putIfAbsent(tpId, newUserTopicIdPartition(tpId))); + removedPartitions.forEach(updatedUserPartitions::remove); + if (!updatedUserPartitions.equals(assignedUserTopicIdPartitions)) { + assignedUserTopicIdPartitions = Collections.unmodifiableMap(updatedUserPartitions); + isAssignmentChanged = true; + log.debug("Assigned user-topic-partitions: {}", assignedUserTopicIdPartitions); + assignPartitionsLock.notifyAll(); + } + } + } + } - Objects.requireNonNull(addedPartitions, "addedPartitions must not be null"); - Objects.requireNonNull(removedPartitions, "removedPartitions must not be null"); + public Optional<Long> receivedOffsetForPartition(final int partition) { + return Optional.ofNullable(readOffsetsByMetadataPartition.get(partition)); + } - if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) { - return; - } + public boolean isMetadataPartitionAssigned(final int partition) { + return assignedMetadataPartitions.contains(partition); + } - synchronized (assignPartitionsLock) { - Set<TopicIdPartition> updatedReassignedPartitions = new HashSet<>(assignedTopicPartitions); - updatedReassignedPartitions.addAll(addedPartitions); - updatedReassignedPartitions.removeAll(removedPartitions); - Set<Integer> updatedAssignedMetaPartitions = new HashSet<>(); - for (TopicIdPartition tp : updatedReassignedPartitions) { - updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp)); - } + public boolean isUserPartitionAssigned(final TopicIdPartition partition) { + final UserTopicIdPartition utp = assignedUserTopicIdPartitions.get(partition); + return utp != null && utp.isAssigned; + } - // Clear removed topic partitions from in-memory cache. - for (TopicIdPartition removedPartition : removedPartitions) { - remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition); + @Override + public void close() { + if (!isClosed) { + log.info("Closing the instance"); + synchronized (assignPartitionsLock) { + isClosed = true; + assignedUserTopicIdPartitions.values().forEach(this::markInitialized); + consumer.wakeup(); + assignPartitionsLock.notifyAll(); } + } + } - assignedTopicPartitions = Collections.unmodifiableSet(updatedReassignedPartitions); - log.debug("Assigned topic partitions: {}", assignedTopicPartitions); + public Set<Integer> metadataPartitionsAssigned() { + return Collections.unmodifiableSet(assignedMetadataPartitions); + } - if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) { - assignedMetaPartitions = Collections.unmodifiableSet(updatedAssignedMetaPartitions); - log.debug("Assigned metadata topic partitions: {}", assignedMetaPartitions); + private void fetchBeginAndEndOffsets() { + try { + final Set<TopicPartition> unInitializedPartitions = assignedUserTopicIdPartitions.values().stream() Review Comment: nit ```suggestion final Set<TopicPartition> uninitializedPartitions = assignedUserTopicIdPartitions.values().stream() ``` ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java: ########## @@ -185,4 +190,15 @@ public void maybeLoadPartition(TopicIdPartition partition) { topicIdPartition -> new FileBasedRemoteLogMetadataCache(topicIdPartition, partitionLogDirectory(topicIdPartition.topicPartition()))); } Review Comment: nit: `maybeLoadPartition` should have `@Override` annotation now. ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ########## @@ -64,302 +65,393 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); - private static final long POLL_INTERVAL_MS = 100L; + static long pollIntervalMs = 100L; private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); - private final KafkaConsumer<byte[], byte[]> consumer; - private final String metadataTopicName; + private final Consumer<byte[], byte[]> consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; - private final Time time; + private final Time time = new SystemTime(); - // It indicates whether the closing process has been started or not. If it is set as true, - // consumer will stop consuming messages, and it will not allow partition assignments to be updated. - private volatile boolean closing = false; - - // It indicates whether the consumer needs to assign the partitions or not. This is set when it is - // determined that the consumer needs to be assigned with the updated partitions. - private volatile boolean assignPartitions = false; + // It indicates whether the ConsumerTask is closed or not. + private volatile boolean isClosed = false; + // It indicates whether the user topic partition assignment to the consumer has changed or not. If the assignment + // has changed, the consumer will eventually start tracking the newly assigned partitions and stop tracking the + // ones it is no longer assigned to. + private volatile boolean isAssignmentChanged = true; // It represents a lock for any operations related to the assignedTopicPartitions. private final Object assignPartitionsLock = new Object(); // Remote log metadata topic partitions that consumer is assigned to. - private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet(); + private volatile Set<Integer> assignedMetadataPartitions = Collections.emptySet(); // User topic partitions that this broker is a leader/follower for. - private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet(); + private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Collections.emptyMap(); + private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Collections.emptySet(); - // Map of remote log metadata topic partition to consumed offsets. Received consumer records - // may or may not have been processed based on the assigned topic partitions. - private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>(); + private long uninitializedAt = time.milliseconds(); + private boolean isAllUserTopicPartitionsInitialized; - // Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile. - private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = Collections.emptyMap(); + // Map of remote log metadata topic partition to consumed offsets. + private final Map<Integer, Long> readOffsetsByMetadataPartition = new ConcurrentHashMap<>(); + private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition = new HashMap<>(); - private final long committedOffsetSyncIntervalMs; - private CommittedOffsetsFile committedOffsetsFile; - private long lastSyncedTimeMs; + private Map<TopicPartition, BeginAndEndOffsetHolder> offsetHolderByMetadataPartition = new HashMap<>(); + private boolean isOffsetsFetchFailed = false; + private long lastFailedFetchOffsetsTimestamp; - public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer, - String metadataTopicName, - RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, + public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner topicPartitioner, - Path committedOffsetsPath, Review Comment: Haven't look to deep into this, but could you elaborate on why this file is not needed anymore? btw, `COMMITTED_OFFSETS_FILE_NAME` can be removed on ConsumerManager. ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ########## @@ -357,7 +357,9 @@ public void configure(Map<String, ?> configs) { log.info("Started configuring topic-based RLMM with configs: {}", configs); rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs); - rlmmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount()); + if (rlmmTopicPartitioner == null) { Review Comment: Can't we just set the topic partitioner _after_ configure in test harness and remove this validation here? ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ########## @@ -64,302 +65,393 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); - private static final long POLL_INTERVAL_MS = 100L; + static long pollIntervalMs = 100L; private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); - private final KafkaConsumer<byte[], byte[]> consumer; - private final String metadataTopicName; + private final Consumer<byte[], byte[]> consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; - private final Time time; + private final Time time = new SystemTime(); - // It indicates whether the closing process has been started or not. If it is set as true, - // consumer will stop consuming messages, and it will not allow partition assignments to be updated. - private volatile boolean closing = false; - - // It indicates whether the consumer needs to assign the partitions or not. This is set when it is - // determined that the consumer needs to be assigned with the updated partitions. - private volatile boolean assignPartitions = false; + // It indicates whether the ConsumerTask is closed or not. + private volatile boolean isClosed = false; + // It indicates whether the user topic partition assignment to the consumer has changed or not. If the assignment + // has changed, the consumer will eventually start tracking the newly assigned partitions and stop tracking the + // ones it is no longer assigned to. + private volatile boolean isAssignmentChanged = true; Review Comment: nit ```suggestion private volatile boolean hasAssignmentChanged = true; ``` ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ########## @@ -515,6 +517,11 @@ public void startConsumerThread() { } } Review Comment: btw, `startConsumerThread` it's unused, can be removed? ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ########## @@ -64,302 +65,393 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); - private static final long POLL_INTERVAL_MS = 100L; + static long pollIntervalMs = 100L; private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); - private final KafkaConsumer<byte[], byte[]> consumer; - private final String metadataTopicName; + private final Consumer<byte[], byte[]> consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; - private final Time time; + private final Time time = new SystemTime(); - // It indicates whether the closing process has been started or not. If it is set as true, - // consumer will stop consuming messages, and it will not allow partition assignments to be updated. - private volatile boolean closing = false; - - // It indicates whether the consumer needs to assign the partitions or not. This is set when it is - // determined that the consumer needs to be assigned with the updated partitions. - private volatile boolean assignPartitions = false; + // It indicates whether the ConsumerTask is closed or not. + private volatile boolean isClosed = false; + // It indicates whether the user topic partition assignment to the consumer has changed or not. If the assignment + // has changed, the consumer will eventually start tracking the newly assigned partitions and stop tracking the + // ones it is no longer assigned to. + private volatile boolean isAssignmentChanged = true; // It represents a lock for any operations related to the assignedTopicPartitions. private final Object assignPartitionsLock = new Object(); // Remote log metadata topic partitions that consumer is assigned to. - private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet(); + private volatile Set<Integer> assignedMetadataPartitions = Collections.emptySet(); // User topic partitions that this broker is a leader/follower for. - private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet(); + private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Collections.emptyMap(); + private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Collections.emptySet(); - // Map of remote log metadata topic partition to consumed offsets. Received consumer records - // may or may not have been processed based on the assigned topic partitions. - private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>(); + private long uninitializedAt = time.milliseconds(); + private boolean isAllUserTopicPartitionsInitialized; - // Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile. - private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = Collections.emptyMap(); + // Map of remote log metadata topic partition to consumed offsets. + private final Map<Integer, Long> readOffsetsByMetadataPartition = new ConcurrentHashMap<>(); + private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition = new HashMap<>(); - private final long committedOffsetSyncIntervalMs; - private CommittedOffsetsFile committedOffsetsFile; - private long lastSyncedTimeMs; + private Map<TopicPartition, BeginAndEndOffsetHolder> offsetHolderByMetadataPartition = new HashMap<>(); + private boolean isOffsetsFetchFailed = false; + private long lastFailedFetchOffsetsTimestamp; - public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer, - String metadataTopicName, - RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, + public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner topicPartitioner, - Path committedOffsetsPath, - Time time, - long committedOffsetSyncIntervalMs) { - this.consumer = Objects.requireNonNull(consumer); - this.metadataTopicName = Objects.requireNonNull(metadataTopicName); + Function<Optional<String>, Consumer<byte[], byte[]>> consumerSupplier) { + this.consumer = consumerSupplier.apply(Optional.empty()); this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler); this.topicPartitioner = Objects.requireNonNull(topicPartitioner); - this.time = Objects.requireNonNull(time); - this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs; - - initializeConsumerAssignment(committedOffsetsPath); - } - - private void initializeConsumerAssignment(Path committedOffsetsPath) { - try { - committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile()); - } catch (IOException e) { - throw new KafkaException(e); - } - - Map<Integer, Long> committedOffsets = Collections.emptyMap(); - try { - // Load committed offset and assign them in the consumer. - committedOffsets = committedOffsetsFile.readEntries(); - } catch (IOException e) { - // Ignore the error and consumer consumes from the earliest offset. - log.error("Encountered error while building committed offsets from the file. " + - "Consumer will consume from the earliest offset for the assigned partitions.", e); - } - - if (!committedOffsets.isEmpty()) { - // Assign topic partitions from the earlier committed offsets file. - Set<Integer> earlierAssignedPartitions = committedOffsets.keySet(); - assignedMetaPartitions = Collections.unmodifiableSet(earlierAssignedPartitions); - Set<TopicPartition> metadataTopicPartitions = earlierAssignedPartitions.stream() - .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x)) - .collect(Collectors.toSet()); - consumer.assign(metadataTopicPartitions); - - // Seek to the committed offsets - for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) { - log.debug("Updating consumed offset: [{}] for partition [{}]", entry.getValue(), entry.getKey()); - partitionToConsumedOffsets.put(entry.getKey(), entry.getValue()); - consumer.seek(new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), entry.getValue()); - } - - lastSyncedPartitionToConsumedOffsets = Collections.unmodifiableMap(committedOffsets); - } } @Override public void run() { - log.info("Started Consumer task thread."); - lastSyncedTimeMs = time.milliseconds(); - try { - while (!closing) { - maybeWaitForPartitionsAssignment(); + log.info("Starting consumer task thread."); + while (!isClosed) { + try { + if (isAssignmentChanged) { + maybeWaitForPartitionsAssignment(); + } log.trace("Polling consumer to receive remote log metadata topic records"); - ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS)); + final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(pollIntervalMs)); for (ConsumerRecord<byte[], byte[]> record : consumerRecords) { processConsumerRecord(record); } - - maybeSyncCommittedDataAndOffsets(false); + maybeMarkUserPartitionsAsReady(); + } catch (final WakeupException ex) { + // ignore logging the error + isClosed = true; + } catch (final RetriableException ex) { + log.warn("Retriable error occurred while processing the records. Retrying...", ex); + } catch (final Exception ex) { + isClosed = true; + log.error("Error occurred while processing the records", ex); } - } catch (Exception e) { - log.error("Error occurred in consumer task, close:[{}]", closing, e); - } finally { - maybeSyncCommittedDataAndOffsets(true); - closeConsumer(); - log.info("Exiting from consumer task thread"); } + try { + consumer.close(Duration.ofSeconds(30)); + } catch (final Exception e) { + log.error("Error encountered while closing the consumer", e); + } + log.info("Exited from consumer task thread"); } private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) { - // Taking assignPartitionsLock here as updateAssignmentsForPartitions changes assignedTopicPartitions - // and also calls remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for the removed - // partitions. - RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value()); - synchronized (assignPartitionsLock) { - if (assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) { - remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata); - } else { - log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata); - } - log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition()); - partitionToConsumedOffsets.put(record.partition(), record.offset()); + final RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value()); + if (canProcess(remoteLogMetadata, record.offset())) { + remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata); + readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), record.offset()); + } else { + log.debug("The event {} is skipped because it is either already processed or not assigned to this consumer", remoteLogMetadata); } + log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition()); + readOffsetsByMetadataPartition.put(record.partition(), record.offset()); + } + + private boolean canProcess(final RemoteLogMetadata metadata, final long recordOffset) { + final TopicIdPartition tpId = metadata.topicIdPartition(); + final Long readOffset = readOffsetsByUserTopicPartition.get(tpId); + return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && (readOffset == null || readOffset < recordOffset); } - private void maybeSyncCommittedDataAndOffsets(boolean forceSync) { - // Return immediately if there is no consumption from last time. - boolean noConsumedOffsetUpdates = partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets); - if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) { - log.debug("Skip syncing committed offsets, noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, forceSync); + private void maybeMarkUserPartitionsAsReady() { + if (isAllUserTopicPartitionsInitialized) { return; } - - try { - // Need to take lock on assignPartitionsLock as assignedTopicPartitions might - // get updated by other threads. - synchronized (assignPartitionsLock) { - for (TopicIdPartition topicIdPartition : assignedTopicPartitions) { - int metadataPartition = topicPartitioner.metadataPartition(topicIdPartition); - Long offset = partitionToConsumedOffsets.get(metadataPartition); - if (offset != null) { - remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, metadataPartition, offset); + maybeFetchBeginAndEndOffsets(); + boolean isAllInitialized = true; + for (final UserTopicIdPartition utp : assignedUserTopicIdPartitions.values()) { + if (utp.isAssigned && !utp.isInitialized) { + final Integer metadataPartition = utp.metadataPartition; + final BeginAndEndOffsetHolder holder = offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition)); + // The offset-holder can be null, when the recent assignment wasn't picked up by the consumer. + if (holder != null) { + final Long readOffset = readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L); + // 1) The end-offset was fetched only once during reassignment. The metadata-partition can receive + // new stream of records, so the consumer can read records more than the last-fetched end-offset. + // 2) When the internal topic becomes empty due to breach by size/time/start-offset, then there + // are no records to read. + if (readOffset + 1 >= holder.endOffset || holder.endOffset.equals(holder.beginOffset)) { + markInitialized(utp); } else { - log.debug("Skipping sync-up of the remote-log-metadata-file for partition: [{}] , with remote log metadata partition{}, and no offset", - topicIdPartition, metadataPartition); + log.debug("The user-topic-partition {} could not be marked initialized since the read-offset is {} " + + "but the end-offset is {} for the metadata-partition {}", utp, readOffset, holder.endOffset, + metadataPartition); } + } else { + log.debug("The offset-holder is null for the metadata-partition {}. The consumer may not have picked" + + " up the recent assignment", metadataPartition); } - - // Write partitionToConsumedOffsets into committed offsets file as we do not want to process them again - // in case of restarts. - committedOffsetsFile.writeEntries(partitionToConsumedOffsets); - lastSyncedPartitionToConsumedOffsets = new HashMap<>(partitionToConsumedOffsets); } - - lastSyncedTimeMs = time.milliseconds(); - } catch (IOException e) { - throw new KafkaException("Error encountered while writing committed offsets to a local file", e); + isAllInitialized = isAllInitialized && utp.isInitialized; } - } - - private void closeConsumer() { - log.info("Closing the consumer instance"); - try { - consumer.close(Duration.ofSeconds(30)); - } catch (Exception e) { - log.error("Error encountered while closing the consumer", e); + if (isAllInitialized) { + log.info("Initialized for all the {} assigned user-partitions mapped to the {} meta-partitions in {} ms", + assignedUserTopicIdPartitions.size(), assignedMetadataPartitions.size(), + time.milliseconds() - uninitializedAt); } + isAllUserTopicPartitionsInitialized = isAllInitialized; } - private void maybeWaitForPartitionsAssignment() { - Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet(); + void maybeWaitForPartitionsAssignment() throws InterruptedException { + // Snapshots of the metadata-partition and user-topic-partition are used to reduce the scope of the + // synchronization block. + // 1) LEADER_AND_ISR and STOP_REPLICA requests adds / removes the user-topic-partitions from the request + // handler threads. Those threads should not be blocked for a long time, therefore scope of the + // synchronization block is reduced to bare minimum. + // 2) Note that the consumer#position, consumer#seekToBeginning, consumer#seekToEnd and the other consumer APIs + // response times are un-predictable. Those should not be kept in the synchronization block. + final Set<Integer> metadataPartitionSnapshot = new HashSet<>(); + final Set<UserTopicIdPartition> assignedUserTopicIdPartitionsSnapshot = new HashSet<>(); synchronized (assignPartitionsLock) { - // If it is closing, return immediately. This should be inside the assignPartitionsLock as the closing is updated - // in close() method with in the same lock to avoid any race conditions. - if (closing) { - return; + while (!isClosed && assignedUserTopicIdPartitions.isEmpty()) { + log.debug("Waiting for remote log metadata partitions to be assigned"); + assignPartitionsLock.wait(); } - - while (assignedMetaPartitions.isEmpty()) { - // If no partitions are assigned, wait until they are assigned. - log.debug("Waiting for assigned remote log metadata partitions.."); - try { - // No timeout is set here, as it is always notified. Even when it is closed, the race can happen - // between the thread calling this method and the thread calling close(). We should have a check - // for closing as that might have been set and notified with assignPartitionsLock by `close` - // method. - assignPartitionsLock.wait(); - - if (closing) { - return; - } - } catch (InterruptedException e) { - throw new KafkaException(e); - } - } - - if (assignPartitions) { - assignedMetaPartitionsSnapshot = new HashSet<>(assignedMetaPartitions); - // Removing unassigned meta partitions from partitionToConsumedOffsets and partitionToCommittedOffsets - partitionToConsumedOffsets.entrySet().removeIf(entry -> !assignedMetaPartitions.contains(entry.getKey())); - - assignPartitions = false; + if (!isClosed && isAssignmentChanged) { + assignedUserTopicIdPartitions.values().forEach(utp -> { + metadataPartitionSnapshot.add(utp.metadataPartition); + assignedUserTopicIdPartitionsSnapshot.add(utp); + }); + isAssignmentChanged = false; } } - - if (!assignedMetaPartitionsSnapshot.isEmpty()) { - executeReassignment(assignedMetaPartitionsSnapshot); + if (!metadataPartitionSnapshot.isEmpty()) { + final Set<TopicPartition> remoteLogPartitions = toRemoteLogPartitions(metadataPartitionSnapshot); + consumer.assign(remoteLogPartitions); + this.assignedMetadataPartitions = Collections.unmodifiableSet(metadataPartitionSnapshot); + // for newly assigned user-partitions, read from the beginning of the corresponding metadata partition + final Set<TopicPartition> seekToBeginOffsetPartitions = assignedUserTopicIdPartitionsSnapshot + .stream() + .filter(utp -> !utp.isAssigned) + .map(utp -> toRemoteLogPartition(utp.metadataPartition)) + .collect(Collectors.toSet()); + consumer.seekToBeginning(seekToBeginOffsetPartitions); + // for other metadata partitions, read from the offset where the processing left last time. + remoteLogPartitions.stream() + .filter(tp -> !seekToBeginOffsetPartitions.contains(tp) && + readOffsetsByMetadataPartition.containsKey(tp.partition())) + .forEach(tp -> consumer.seek(tp, readOffsetsByMetadataPartition.get(tp.partition()))); + // mark all the user-topic-partitions as assigned to the consumer. + assignedUserTopicIdPartitionsSnapshot.forEach(utp -> { + if (!utp.isAssigned) { + // Note that there can be a race between `remove` and `add` partition assignment. Calling the + // `maybeLoadPartition` here again to be sure that the partition gets loaded on the handler. + remotePartitionMetadataEventHandler.maybeLoadPartition(utp.topicIdPartition); + utp.isAssigned = true; + } + }); + processedAssignmentOfUserTopicIdPartitions = assignedUserTopicIdPartitionsSnapshot.stream() + .map(utp -> utp.topicIdPartition).collect(Collectors.toSet()); + clearResourcesForUnassignedUserTopicPartitions(assignedUserTopicIdPartitionsSnapshot); + isAllUserTopicPartitionsInitialized = false; + uninitializedAt = time.milliseconds(); + fetchBeginAndEndOffsets(); } } - private void executeReassignment(Set<Integer> assignedMetaPartitionsSnapshot) { - Set<TopicPartition> assignedMetaTopicPartitions = - assignedMetaPartitionsSnapshot.stream() - .map(partitionNum -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum)) - .collect(Collectors.toSet()); - log.info("Reassigning partitions to consumer task [{}]", assignedMetaTopicPartitions); - consumer.assign(assignedMetaTopicPartitions); + private void clearResourcesForUnassignedUserTopicPartitions(Set<UserTopicIdPartition> assignedUTPs) { + Set<TopicIdPartition> assignedPartitions = assignedUTPs.stream() + .map(utp -> utp.topicIdPartition).collect(Collectors.toSet()); + // Note that there can be previously assigned user-topic-partitions where no records are there to read + // (eg) none of the segments for a partition were uploaded. Those partition resources won't be cleared. + // It can be fixed later when required since they are empty resources. + Set<TopicIdPartition> unassignedPartitions = readOffsetsByUserTopicPartition.keySet() + .stream() + .filter(e -> !assignedPartitions.contains(e)) + .collect(Collectors.toSet()); + unassignedPartitions.forEach(unassignedPartition -> { + remotePartitionMetadataEventHandler.clearTopicPartition(unassignedPartition); + readOffsetsByUserTopicPartition.remove(unassignedPartition); + }); + log.info("Unassigned user-topic-partitions: {}", unassignedPartitions.size()); } - public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) { - updateAssignmentsForPartitions(partitions, Collections.emptySet()); + public void addAssignmentsForPartitions(final Set<TopicIdPartition> partitions) { + updateAssignments(Objects.requireNonNull(partitions), Collections.emptySet()); } - public void removeAssignmentsForPartitions(Set<TopicIdPartition> partitions) { - updateAssignmentsForPartitions(Collections.emptySet(), partitions); + public void removeAssignmentsForPartitions(final Set<TopicIdPartition> partitions) { + updateAssignments(Collections.emptySet(), Objects.requireNonNull(partitions)); } - private void updateAssignmentsForPartitions(Set<TopicIdPartition> addedPartitions, - Set<TopicIdPartition> removedPartitions) { - log.info("Updating assignments for addedPartitions: {} and removedPartition: {}", addedPartitions, removedPartitions); + private void updateAssignments(final Set<TopicIdPartition> addedPartitions, + final Set<TopicIdPartition> removedPartitions) { + log.info("Updating assignments for partitions added: {} and removed: {}", addedPartitions, removedPartitions); + if (!addedPartitions.isEmpty() || !removedPartitions.isEmpty()) { + synchronized (assignPartitionsLock) { + // Make a copy of the existing assignments and update the copy. + final Map<TopicIdPartition, UserTopicIdPartition> updatedUserPartitions = new HashMap<>(assignedUserTopicIdPartitions); + addedPartitions.forEach(tpId -> updatedUserPartitions.putIfAbsent(tpId, newUserTopicIdPartition(tpId))); + removedPartitions.forEach(updatedUserPartitions::remove); + if (!updatedUserPartitions.equals(assignedUserTopicIdPartitions)) { + assignedUserTopicIdPartitions = Collections.unmodifiableMap(updatedUserPartitions); + isAssignmentChanged = true; + log.debug("Assigned user-topic-partitions: {}", assignedUserTopicIdPartitions); + assignPartitionsLock.notifyAll(); + } + } + } + } - Objects.requireNonNull(addedPartitions, "addedPartitions must not be null"); - Objects.requireNonNull(removedPartitions, "removedPartitions must not be null"); + public Optional<Long> receivedOffsetForPartition(final int partition) { + return Optional.ofNullable(readOffsetsByMetadataPartition.get(partition)); + } - if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) { - return; - } + public boolean isMetadataPartitionAssigned(final int partition) { + return assignedMetadataPartitions.contains(partition); + } - synchronized (assignPartitionsLock) { - Set<TopicIdPartition> updatedReassignedPartitions = new HashSet<>(assignedTopicPartitions); - updatedReassignedPartitions.addAll(addedPartitions); - updatedReassignedPartitions.removeAll(removedPartitions); - Set<Integer> updatedAssignedMetaPartitions = new HashSet<>(); - for (TopicIdPartition tp : updatedReassignedPartitions) { - updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp)); - } + public boolean isUserPartitionAssigned(final TopicIdPartition partition) { + final UserTopicIdPartition utp = assignedUserTopicIdPartitions.get(partition); + return utp != null && utp.isAssigned; + } - // Clear removed topic partitions from in-memory cache. - for (TopicIdPartition removedPartition : removedPartitions) { - remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition); + @Override + public void close() { + if (!isClosed) { + log.info("Closing the instance"); + synchronized (assignPartitionsLock) { + isClosed = true; + assignedUserTopicIdPartitions.values().forEach(this::markInitialized); + consumer.wakeup(); + assignPartitionsLock.notifyAll(); } + } + } - assignedTopicPartitions = Collections.unmodifiableSet(updatedReassignedPartitions); - log.debug("Assigned topic partitions: {}", assignedTopicPartitions); + public Set<Integer> metadataPartitionsAssigned() { + return Collections.unmodifiableSet(assignedMetadataPartitions); + } - if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) { - assignedMetaPartitions = Collections.unmodifiableSet(updatedAssignedMetaPartitions); - log.debug("Assigned metadata topic partitions: {}", assignedMetaPartitions); + private void fetchBeginAndEndOffsets() { + try { + final Set<TopicPartition> unInitializedPartitions = assignedUserTopicIdPartitions.values().stream() + .filter(utp -> utp.isAssigned && !utp.isInitialized) + .map(utp -> toRemoteLogPartition(utp.metadataPartition)) + .collect(Collectors.toSet()); + // Removing the previous offset holder if it exists. During reassignment, if the list-offset + // call to `earliest` and `latest` offset fails, then we should not use the previous values. + unInitializedPartitions.forEach(x -> offsetHolderByMetadataPartition.remove(x)); + if (!unInitializedPartitions.isEmpty()) { + Map<TopicPartition, Long> endOffsets = consumer.endOffsets(unInitializedPartitions); + Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(unInitializedPartitions); + offsetHolderByMetadataPartition = endOffsets.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> new BeginAndEndOffsetHolder(beginOffsets.get(e.getKey()), e.getValue()))); - assignPartitions = true; - assignPartitionsLock.notifyAll(); - } else { - log.debug("No change in assigned metadata topic partitions: {}", assignedMetaPartitions); } + isOffsetsFetchFailed = false; + } catch (final RetriableException ex) { + // ignore LEADER_NOT_AVAILABLE error, this can happen when the partition leader is not yet assigned. + isOffsetsFetchFailed = true; + lastFailedFetchOffsetsTimestamp = time.milliseconds(); } } - public Optional<Long> receivedOffsetForPartition(int partition) { - return Optional.ofNullable(partitionToConsumedOffsets.get(partition)); + private void maybeFetchBeginAndEndOffsets() { + // If the leader for a `__remote_log_metadata` partition is not available, then the call to `ListOffsets` + // will fail after the default timeout of 1 min. Added a delay of 5 min in between the retries to prevent the + // thread from aggressively fetching the list offsets. During this time, the recently reassigned + // user-topic-partitions won't be marked as initialized. + if (isOffsetsFetchFailed && lastFailedFetchOffsetsTimestamp + 300_000 < time.milliseconds()) { + fetchBeginAndEndOffsets(); + } } - public boolean isPartitionAssigned(int partition) { - return assignedMetaPartitions.contains(partition); + private UserTopicIdPartition newUserTopicIdPartition(final TopicIdPartition tpId) { + return new UserTopicIdPartition(tpId, topicPartitioner.metadataPartition(tpId)); } - public void close() { - if (!closing) { - synchronized (assignPartitionsLock) { - // Closing should be updated only after acquiring the lock to avoid race in - // maybeWaitForPartitionsAssignment() where it waits on assignPartitionsLock. It should not wait - // if the closing is already set. - closing = true; - consumer.wakeup(); - assignPartitionsLock.notifyAll(); - } + private void markInitialized(final UserTopicIdPartition utp) { + // Silently not initialize the utp + if (!utp.isAssigned) { + log.warn("Tried to initialize a UTP: {} that was not yet assigned!", utp); + return; + } + if (!utp.isInitialized) { + remotePartitionMetadataEventHandler.markInitialized(utp.topicIdPartition); + utp.isInitialized = true; } } - public Set<Integer> metadataPartitionsAssigned() { - return Collections.unmodifiableSet(assignedMetaPartitions); + static Set<TopicPartition> toRemoteLogPartitions(final Set<Integer> partitions) { + return partitions.stream() + .map(ConsumerTask::toRemoteLogPartition) + .collect(Collectors.toSet()); + } + + static TopicPartition toRemoteLogPartition(int partition) { + return new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partition); + } + + static class UserTopicIdPartition { + private final TopicIdPartition topicIdPartition; + private final Integer metadataPartition; + // The `utp` will be initialized once it reads all the existing events from the remote log metadata topic. + boolean isInitialized; + // denotes whether this `utp` is assigned to the consumer + boolean isAssigned; + + /** + * UserTopicIdPartition denotes the user topic-partitions for which this broker acts as a leader/follower of. + * + * @param tpId the unique topic partition identifier + * @param metadataPartition the remote log metadata partition mapped for this user-topic-partition. + */ + public UserTopicIdPartition(final TopicIdPartition tpId, final Integer metadataPartition) { + this.topicIdPartition = Objects.requireNonNull(tpId); + this.metadataPartition = Objects.requireNonNull(metadataPartition); + this.isInitialized = false; + this.isAssigned = false; + } + + @Override + public String toString() { + return "UserTopicIdPartition{" + + "topicIdPartition=" + topicIdPartition + + ", metadataPartition=" + metadataPartition + + ", isInitialized=" + isInitialized + + ", isAssigned=" + isAssigned + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + UserTopicIdPartition that = (UserTopicIdPartition) o; + return topicIdPartition.equals(that.topicIdPartition) && metadataPartition.equals(that.metadataPartition); + } + + @Override + public int hashCode() { + return Objects.hash(topicIdPartition, metadataPartition); + } + } + + static class BeginAndEndOffsetHolder { + Long beginOffset; Review Comment: nit: RemoteLogSegmentMetadata uses start|endOffset. ```suggestion static class StartAndEndOffsets { Long startOffset; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org