abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292142026


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();
 
-    // It indicates whether the closing process has been started or not. If it 
is set as true,
-    // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
-    // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-    // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    // It indicates whether the ConsumerTask is closed or not.
+    private volatile boolean isClosed = false;
+    // It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+    // has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+    // ones it is no longer assigned to.
+    private volatile boolean isAssignmentChanged = true;
 
     // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
-    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+    private volatile Set<Integer> assignedMetadataPartitions = 
Collections.emptySet();
 
     // User topic partitions that this broker is a leader/follower for.
-    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+    private volatile Map<TopicIdPartition, UserTopicIdPartition> 
assignedUserTopicIdPartitions = Collections.emptyMap();
+    private volatile Set<TopicIdPartition> 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-    // may or may not have been processed based on the assigned topic 
partitions.
-    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+    private long uninitializedAt = time.milliseconds();
+    private boolean isAllUserTopicPartitionsInitialized;
 
-    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-    private final long committedOffsetSyncIntervalMs;
-    private CommittedOffsetsFile committedOffsetsFile;
-    private long lastSyncedTimeMs;
+    private Map<TopicPartition, BeginAndEndOffsetHolder> 
offsetHolderByMetadataPartition = new HashMap<>();
+    private boolean isOffsetsFetchFailed = false;
+    private long lastFailedFetchOffsetsTimestamp;
 
-    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
-                        String metadataTopicName,
-                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+    public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
-                        Path committedOffsetsPath,
-                        Time time,
-                        long committedOffsetSyncIntervalMs) {
-        this.consumer = Objects.requireNonNull(consumer);
-        this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+                        Function<Optional<String>, Consumer<byte[], byte[]>> 
consumerSupplier) {

Review Comment:
   Will fix this. This refactoring was because of another change that will come 
later. For now, we don't need this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to