abhijeetk88 commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1285212647
########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ########## @@ -64,302 +65,387 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); - private static final long POLL_INTERVAL_MS = 100L; + static long pollIntervalMs = 100L; private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); - private final KafkaConsumer<byte[], byte[]> consumer; - private final String metadataTopicName; + private final Consumer<byte[], byte[]> consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; - private final Time time; + private final Time time = new SystemTime(); + // TODO - Update comments below // It indicates whether the closing process has been started or not. If it is set as true, // consumer will stop consuming messages, and it will not allow partition assignments to be updated. - private volatile boolean closing = false; - + private volatile boolean isClosed = false; // It indicates whether the consumer needs to assign the partitions or not. This is set when it is // determined that the consumer needs to be assigned with the updated partitions. - private volatile boolean assignPartitions = false; + private volatile boolean isAssignmentChanged = true; // It represents a lock for any operations related to the assignedTopicPartitions. private final Object assignPartitionsLock = new Object(); // Remote log metadata topic partitions that consumer is assigned to. - private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet(); + private volatile Set<Integer> assignedMetadataPartitions = Collections.emptySet(); // User topic partitions that this broker is a leader/follower for. - private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet(); + private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Collections.emptyMap(); + private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Collections.emptySet(); - // Map of remote log metadata topic partition to consumed offsets. Received consumer records - // may or may not have been processed based on the assigned topic partitions. - private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>(); + private long uninitializedAt = time.milliseconds(); + private boolean isAllUserTopicPartitionsInitialized; - // Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile. - private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = Collections.emptyMap(); + // Map of remote log metadata topic partition to consumed offsets. + private final Map<Integer, Long> readOffsetsByMetadataPartition = new ConcurrentHashMap<>(); + private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition = new HashMap<>(); - private final long committedOffsetSyncIntervalMs; - private CommittedOffsetsFile committedOffsetsFile; - private long lastSyncedTimeMs; + private Map<TopicPartition, BeginAndEndOffsetHolder> offsetHolderByMetadataPartition = new HashMap<>(); + private boolean isOffsetsFetchFailed = false; + private long lastFailedFetchOffsetsTimestamp; - public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer, - String metadataTopicName, - RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, + public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner topicPartitioner, - Path committedOffsetsPath, - Time time, - long committedOffsetSyncIntervalMs) { - this.consumer = Objects.requireNonNull(consumer); - this.metadataTopicName = Objects.requireNonNull(metadataTopicName); + Function<Optional<String>, Consumer<byte[], byte[]>> consumerSupplier) { + this.consumer = consumerSupplier.apply(Optional.empty()); this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler); this.topicPartitioner = Objects.requireNonNull(topicPartitioner); - this.time = Objects.requireNonNull(time); - this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs; - - initializeConsumerAssignment(committedOffsetsPath); - } - - private void initializeConsumerAssignment(Path committedOffsetsPath) { - try { - committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile()); - } catch (IOException e) { - throw new KafkaException(e); - } - - Map<Integer, Long> committedOffsets = Collections.emptyMap(); - try { - // Load committed offset and assign them in the consumer. - committedOffsets = committedOffsetsFile.readEntries(); - } catch (IOException e) { - // Ignore the error and consumer consumes from the earliest offset. - log.error("Encountered error while building committed offsets from the file. " + - "Consumer will consume from the earliest offset for the assigned partitions.", e); - } - - if (!committedOffsets.isEmpty()) { - // Assign topic partitions from the earlier committed offsets file. - Set<Integer> earlierAssignedPartitions = committedOffsets.keySet(); - assignedMetaPartitions = Collections.unmodifiableSet(earlierAssignedPartitions); - Set<TopicPartition> metadataTopicPartitions = earlierAssignedPartitions.stream() - .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x)) - .collect(Collectors.toSet()); - consumer.assign(metadataTopicPartitions); - - // Seek to the committed offsets - for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) { - log.debug("Updating consumed offset: [{}] for partition [{}]", entry.getValue(), entry.getKey()); - partitionToConsumedOffsets.put(entry.getKey(), entry.getValue()); - consumer.seek(new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), entry.getValue()); - } - - lastSyncedPartitionToConsumedOffsets = Collections.unmodifiableMap(committedOffsets); - } } @Override public void run() { - log.info("Started Consumer task thread."); - lastSyncedTimeMs = time.milliseconds(); - try { - while (!closing) { - maybeWaitForPartitionsAssignment(); + log.info("Starting consumer task thread."); + while (!isClosed) { + try { + if (isAssignmentChanged) { + maybeWaitForPartitionsAssignment(); + } log.trace("Polling consumer to receive remote log metadata topic records"); - ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS)); + final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(pollIntervalMs)); for (ConsumerRecord<byte[], byte[]> record : consumerRecords) { processConsumerRecord(record); } - - maybeSyncCommittedDataAndOffsets(false); + maybeMarkUserPartitionsAsReady(); + } catch (final WakeupException ex) { + // ignore logging the error + isClosed = true; + } catch (final RetriableException ex) { + log.warn("Retriable error occurred while processing the records. Retrying...", ex); + } catch (final Exception ex) { + isClosed = true; + log.error("Error occurred while processing the records", ex); } - } catch (Exception e) { - log.error("Error occurred in consumer task, close:[{}]", closing, e); - } finally { - maybeSyncCommittedDataAndOffsets(true); - closeConsumer(); - log.info("Exiting from consumer task thread"); } + try { + consumer.close(Duration.ofSeconds(30)); + } catch (final Exception e) { + log.error("Error encountered while closing the consumer", e); + } + log.info("Exited from consumer task thread"); } private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) { - // Taking assignPartitionsLock here as updateAssignmentsForPartitions changes assignedTopicPartitions - // and also calls remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for the removed - // partitions. - RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value()); - synchronized (assignPartitionsLock) { - if (assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) { - remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata); - } else { - log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata); - } - log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition()); - partitionToConsumedOffsets.put(record.partition(), record.offset()); + final RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value()); + if (canProcess(remoteLogMetadata, record.offset())) { + remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata); + readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), record.offset()); + } else { + log.debug("The event {} is skipped because it is either already processed or not assigned to this consumer", remoteLogMetadata); } + log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition()); + readOffsetsByMetadataPartition.put(record.partition(), record.offset()); + } + + private boolean canProcess(final RemoteLogMetadata metadata, final long recordOffset) { + final TopicIdPartition tpId = metadata.topicIdPartition(); + final Long readOffset = readOffsetsByUserTopicPartition.get(tpId); + return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && (readOffset == null || readOffset < recordOffset); } - private void maybeSyncCommittedDataAndOffsets(boolean forceSync) { - // Return immediately if there is no consumption from last time. - boolean noConsumedOffsetUpdates = partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets); - if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) { - log.debug("Skip syncing committed offsets, noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, forceSync); + private void maybeMarkUserPartitionsAsReady() { + if (isAllUserTopicPartitionsInitialized) { return; } - - try { - // Need to take lock on assignPartitionsLock as assignedTopicPartitions might - // get updated by other threads. - synchronized (assignPartitionsLock) { - for (TopicIdPartition topicIdPartition : assignedTopicPartitions) { - int metadataPartition = topicPartitioner.metadataPartition(topicIdPartition); - Long offset = partitionToConsumedOffsets.get(metadataPartition); - if (offset != null) { - remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, metadataPartition, offset); - } else { - log.debug("Skipping sync-up of the remote-log-metadata-file for partition: [{}] , with remote log metadata partition{}, and no offset", - topicIdPartition, metadataPartition); + maybeFetchBeginAndEndOffsets(); + boolean isAllInitialized = true; + for (final UserTopicIdPartition utp : assignedUserTopicIdPartitions.values()) { + if (utp.isAssigned && !utp.isInitialized) { + final Integer metadataPartition = utp.metadataPartition; + final BeginAndEndOffsetHolder holder = offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition)); + // The offset-holder can be null, when the recent assignment wasn't picked up by the consumer. + if (holder != null) { + final Long readOffset = readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L); + // 1) The end-offset was fetched only once during reassignment. The metadata-partition can receive + // new stream of records, so the consumer can read records more than the last-fetched end-offset. + // 2) When the internal topic becomes empty due to breach by size/time/start-offset, then there + // are no records to read. + if (readOffset + 1 >= holder.endOffset || holder.endOffset.equals(holder.beginOffset)) { + markInitialized(utp); } } Review Comment: Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org