[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-25 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1305682200


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java:
##
@@ -153,7 +153,7 @@ private FileBasedRemoteLogMetadataCache 
getRemoteLogMetadataCache(TopicIdPartiti
 }
 
 if (!remoteLogMetadataCache.isInitialized()) {
-throw new ResourceNotReadyException("Remote log metadata cache is 
not initialized for partition: " + topicIdPartition);
+throw new ReplicaNotAvailableException("Remote log metadata cache 
is not initialized for partition: " + topicIdPartition);

Review Comment:
   Sure, I will add.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-25 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1305505525


##
clients/src/main/java/org/apache/kafka/common/errors/ResourceNotReadyException.java:
##
@@ -0,0 +1,14 @@
+package org.apache.kafka.common.errors;
+
+public class ResourceNotReadyException extends RetriableException {

Review Comment:
   Updated the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-25 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1305501226


##
clients/src/main/java/org/apache/kafka/common/errors/ResourceNotReadyException.java:
##
@@ -0,0 +1,14 @@
+package org.apache.kafka.common.errors;
+
+public class ResourceNotReadyException extends RetriableException {

Review Comment:
   We couldn't find another exception that fits our case. We can use 
`ReplicaNotAvailableException` for now. I have created a jira to introduce a 
new retryable exception/error code.
   https://issues.apache.org/jira/browse/KAFKA-15405



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-24 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1304505511


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java:
##
@@ -104,6 +106,22 @@ public class RemoteLogMetadataCache {
 // https://issues.apache.org/jira/browse/KAFKA-12641
 protected final ConcurrentMap 
leaderEpochEntries = new ConcurrentHashMap<>();
 
+private final CountDownLatch initializedLatch = new CountDownLatch(1);
+
+public void markInitialized() {
+initializedLatch.countDown();
+}
+
+public void ensureInitialized() throws InterruptedException {
+if (!initializedLatch.await(2, TimeUnit.MINUTES)) {

Review Comment:
   @showuon I have addressed this. Added a new retryable exception. Let me know 
if we need to rename it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-23 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1303224763


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);


[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-23 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1303220778


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.SystemTime;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 
300_000L, new SystemTime());
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join(10_000);
+}
+}
+
+/**
+ * Tests that the consumer task shuts down gracefully 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-21 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299873620


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +63,403 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
-
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+// The timeout for the consumer to poll records from the remote log 
metadata topic.
+private final long pollTimeoutMs;
 private final Time time;
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+// The initial value is set to true to wait for partition assignment on 
the first execution; otherwise thread will
+// be busy without actually doing anything
+private volatile boolean hasAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt;
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean hasLastOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
+// The interval between retries to fetch the start and end offsets for the 
metadata partitions after a failed fetch.
+private final long offsetFetchRetryIntervalMs;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Consumer consumer,
+long pollTimeoutMs,
+long offsetFetchRetryIntervalMs,
+Time time) {
+this.consumer = consumer;
 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-21 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299846165


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.SystemTime;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 
300_000L, new SystemTime());
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join(10_000);
+}
+}
+
+/**
+ * Tests that the consumer task shuts down gracefully 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-21 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299800248


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);


[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-20 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299369176


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +64,395 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
 private final Time time;
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean hasAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt;
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-20 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299369135


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +64,395 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
 private final Time time;
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean hasAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt;
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Consumer consumer) {
+this.consumer = consumer;
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-20 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299369043


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +64,395 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
 private final Time time;
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean hasAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt;
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Consumer consumer) {
+this.consumer = consumer;
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-20 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299368850


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +64,395 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
 private final Time time;
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean hasAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt;
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Consumer consumer) {
+this.consumer = consumer;
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-20 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299368766


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +64,395 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
 private final Time time;
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean hasAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt;
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Consumer consumer) {
+this.consumer = consumer;
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-20 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299368836


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +64,395 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
 private final Time time;
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean hasAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt;
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Consumer consumer) {
+this.consumer = consumer;
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-20 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299368720


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;

Review Comment:
   Added as an input parameter.
   
   Also, it is a timeout value and not a poll interval. I have updated the 
field name. Let me know if the default value of 100 ms does not make sense for 
timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-20 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299368283


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
+// TODO - Update comments below
 // It indicates whether the closing process has been started or not. If it 
is set as true,
 // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
+private volatile boolean isClosed = false;
 // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
 // determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {
+this.consumer = consumerSupplier.apply(Optional.empty());
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} catch (IOException e) {
-throw new KafkaException(e);
-}
-
-Map committedOffsets = Collections.emptyMap();
-try {
-// Load committed offset 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-20 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299368244


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
+// TODO - Update comments below
 // It indicates whether the closing process has been started or not. If it 
is set as true,
 // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
+private volatile boolean isClosed = false;
 // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
 // determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+private volatile boolean isAssignmentChanged = true;

Review Comment:
   Added a note in the comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-20 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299367611


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +64,395 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
 private final Time time;
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean hasAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt;
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292200886


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {

Review Comment:
   done



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##
@@ -357,7 +357,9 @@ public void configure(Map configs) {
 log.info("Started configuring topic-based RLMM with configs: {}", 
configs);
 
 rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
-rlmmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
+if 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292200608


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292197975


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##
@@ -357,7 +357,9 @@ public void configure(Map configs) {
 log.info("Started configuring topic-based RLMM with configs: {}", 
configs);
 
 rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
-rlmmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
+if (rlmmTopicPartitioner == null) {

Review Comment:
   Yes, makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292156303


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join(10_000);
+}
+}
+
+/**
+ * Tests that the consumer task shuts down gracefully when there were no 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292162273


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##
@@ -357,7 +357,9 @@ public void configure(Map configs) {
 log.info("Started configuring topic-based RLMM with configs: {}", 
configs);
 
 rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
-rlmmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
+if (rlmmTopicPartitioner == null) {
+rlmmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());

Review Comment:
   Done



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##
@@ -515,6 +517,11 @@ public void startConsumerThread() {
 }
 }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292162212


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {
+this.consumer = consumerSupplier.apply(Optional.empty());
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
- 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292158885


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {
+this.consumer = consumerSupplier.apply(Optional.empty());
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
- 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292156303


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join(10_000);
+}
+}
+
+/**
+ * Tests that the consumer task shuts down gracefully when there were no 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1291234396


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join(10_000);
+}
+}
+
+/**
+ * Tests that the consumer task shuts down gracefully when there were no 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292149616


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java:
##
@@ -185,4 +190,15 @@ public void maybeLoadPartition(TopicIdPartition partition) 
{
 topicIdPartition -> new 
FileBasedRemoteLogMetadataCache(topicIdPartition, 
partitionLogDirectory(topicIdPartition.topicPartition(;
 }

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292149012


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean isAssignmentChanged = true;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292142083


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,

Review Comment:
   We want to keep the implementation simple for now without checkpointing 
mechanism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292142026


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {

Review Comment:
   Will fix this. This refactoring was because of another change that will come 
later. For now, we don't need this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-12 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1292141846


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;

Review Comment:
   It will take longer for the tests to run with higher poll intervals. That is 
the reason we want to override it in tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-11 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1291235847


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+
+import kafka.utils.EmptyTestInfo;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class);
+
+private static final int SEG_SIZE = 1024 * 1024;
+
+private final Time time = new MockTime(1);
+private final TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness();
+
+private TopicBasedRemoteLogMetadataManager rlmm() {
+return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+}
+
+@BeforeEach
+public void setup() {
+// Start the cluster only.
+remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
+}
+
+@AfterEach
+public void teardown() throws IOException {
+remoteLogMetadataManagerHarness.close();
+}
+
+@Test
+public void testMultiplePartitionSubscriptions() throws Exception {
+// Create topics.
+String leaderTopic = "leader";
+HashMap> assignedLeaderTopicReplicas = new 
HashMap<>();
+List leaderTopicReplicas = new ArrayList<>();
+// Set broker id 0 as the first entry which is taken as the leader.
+leaderTopicReplicas.add(0);
+leaderTopicReplicas.add(1);
+leaderTopicReplicas.add(2);
+assignedLeaderTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(leaderTopicReplicas));
+remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
+JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
+remoteLogMetadataManagerHarness.listenerName());
+
+String followerTopic = "follower";
+HashMap> assignedFollowerTopicReplicas = new 
HashMap<>();
+List followerTopicReplicas = new ArrayList<>();
+// Set broker id 1 as the first entry which is taken as the leader.
+followerTopicReplicas.add(1);
+followerTopicReplicas.add(2);
+followerTopicReplicas.add(0);
+assignedFollowerTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(followerTopicReplicas));
+remoteLogMetadataManagerHarness.createTopicWithAssignment(
+followerTopic, 
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
+remoteLogMetadataManagerHarness.listenerName());
+
+String topicWithNoMessages = "no-messages-topic";
+HashMap> assignedTopicReplicas = new HashMap<>();
+List noMessagesTopicReplicas = new ArrayList<>();
+// Set broker id 1 as the first entry which is taken as the leader.
+noMessagesTopicReplicas.add(1);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-11 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1291235847


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+
+import kafka.utils.EmptyTestInfo;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class);
+
+private static final int SEG_SIZE = 1024 * 1024;
+
+private final Time time = new MockTime(1);
+private final TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness();
+
+private TopicBasedRemoteLogMetadataManager rlmm() {
+return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+}
+
+@BeforeEach
+public void setup() {
+// Start the cluster only.
+remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
+}
+
+@AfterEach
+public void teardown() throws IOException {
+remoteLogMetadataManagerHarness.close();
+}
+
+@Test
+public void testMultiplePartitionSubscriptions() throws Exception {
+// Create topics.
+String leaderTopic = "leader";
+HashMap> assignedLeaderTopicReplicas = new 
HashMap<>();
+List leaderTopicReplicas = new ArrayList<>();
+// Set broker id 0 as the first entry which is taken as the leader.
+leaderTopicReplicas.add(0);
+leaderTopicReplicas.add(1);
+leaderTopicReplicas.add(2);
+assignedLeaderTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(leaderTopicReplicas));
+remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
+JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
+remoteLogMetadataManagerHarness.listenerName());
+
+String followerTopic = "follower";
+HashMap> assignedFollowerTopicReplicas = new 
HashMap<>();
+List followerTopicReplicas = new ArrayList<>();
+// Set broker id 1 as the first entry which is taken as the leader.
+followerTopicReplicas.add(1);
+followerTopicReplicas.add(2);
+followerTopicReplicas.add(0);
+assignedFollowerTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(followerTopicReplicas));
+remoteLogMetadataManagerHarness.createTopicWithAssignment(
+followerTopic, 
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
+remoteLogMetadataManagerHarness.listenerName());
+
+String topicWithNoMessages = "no-messages-topic";
+HashMap> assignedTopicReplicas = new HashMap<>();
+List noMessagesTopicReplicas = new ArrayList<>();
+// Set broker id 1 as the first entry which is taken as the leader.
+noMessagesTopicReplicas.add(1);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-11 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1291234396


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join(10_000);
+}
+}
+
+/**
+ * Tests that the consumer task shuts down gracefully when there were no 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-08 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286634284


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-08 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286633888


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-08 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286633719


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-08 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286632651


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-08 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286632214


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);


[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286091923


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286090883


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);


[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286089506


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
+// TODO - Update comments below
 // It indicates whether the closing process has been started or not. If it 
is set as true,
 // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
+private volatile boolean isClosed = false;
 // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
 // determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {
+this.consumer = consumerSupplier.apply(Optional.empty());
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} catch (IOException e) {
-throw new KafkaException(e);
-}
-
-Map committedOffsets = Collections.emptyMap();
-try {
-// Load committed offset 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-06 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1285212779


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+
+import kafka.utils.EmptyTestInfo;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class);
+
+private static final int SEG_SIZE = 1024 * 1024;
+
+private final Time time = new MockTime(1);
+private final TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness();
+
+private TopicBasedRemoteLogMetadataManager rlmm() {
+return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+}
+
+@BeforeEach
+public void setup() {
+// Start the cluster only.
+remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
+}
+
+@AfterEach
+public void teardown() throws IOException {
+remoteLogMetadataManagerHarness.close();
+}
+
+@Test
+public void testMultiplePartitionSubscriptions() throws Exception {
+// Create topics.
+String leaderTopic = "leader";
+HashMap> assignedLeaderTopicReplicas = new 
HashMap<>();
+List leaderTopicReplicas = new ArrayList<>();
+// Set broker id 0 as the first entry which is taken as the leader.
+leaderTopicReplicas.add(0);
+leaderTopicReplicas.add(1);
+leaderTopicReplicas.add(2);
+assignedLeaderTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(leaderTopicReplicas));
+remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
+JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
+remoteLogMetadataManagerHarness.listenerName());
+
+String followerTopic = "follower";
+HashMap> assignedFollowerTopicReplicas = new 
HashMap<>();
+List followerTopicReplicas = new ArrayList<>();
+// Set broker id 1 as the first entry which is taken as the leader.
+followerTopicReplicas.add(1);
+followerTopicReplicas.add(2);
+followerTopicReplicas.add(0);
+assignedFollowerTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(followerTopicReplicas));
+remoteLogMetadataManagerHarness.createTopicWithAssignment(
+followerTopic, 
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
+remoteLogMetadataManagerHarness.listenerName());
+
+String topicWithNoMessages = "no-messages-topic";
+HashMap> assignedTopicReplicas = new HashMap<>();
+List noMessagesTopicReplicas = new ArrayList<>();
+// Set broker id 1 as the first entry which is taken as the leader.
+noMessagesTopicReplicas.add(1);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-06 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1285212684


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
+// TODO - Update comments below
 // It indicates whether the closing process has been started or not. If it 
is set as true,
 // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
+private volatile boolean isClosed = false;
 // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
 // determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {
+this.consumer = consumerSupplier.apply(Optional.empty());
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} catch (IOException e) {
-throw new KafkaException(e);
-}
-
-Map committedOffsets = Collections.emptyMap();
-try {
-// Load committed offset 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-06 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1285212647


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
+// TODO - Update comments below
 // It indicates whether the closing process has been started or not. If it 
is set as true,
 // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
+private volatile boolean isClosed = false;
 // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
 // determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {
+this.consumer = consumerSupplier.apply(Optional.empty());
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} catch (IOException e) {
-throw new KafkaException(e);
-}
-
-Map committedOffsets = Collections.emptyMap();
-try {
-// Load committed offset 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-06 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1285212583


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
+// TODO - Update comments below
 // It indicates whether the closing process has been started or not. If it 
is set as true,
 // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
+private volatile boolean isClosed = false;
 // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
 // determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {
+this.consumer = consumerSupplier.apply(Optional.empty());
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} catch (IOException e) {
-throw new KafkaException(e);
-}
-
-Map committedOffsets = Collections.emptyMap();
-try {
-// Load committed offset 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-06 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1285212542


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
+// TODO - Update comments below
 // It indicates whether the closing process has been started or not. If it 
is set as true,
 // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
+private volatile boolean isClosed = false;
 // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
 // determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {
+this.consumer = consumerSupplier.apply(Optional.empty());
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} catch (IOException e) {
-throw new KafkaException(e);
-}
-
-Map committedOffsets = Collections.emptyMap();
-try {
-// Load committed offset 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-06 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1285212375


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
+// TODO - Update comments below
 // It indicates whether the closing process has been started or not. If it 
is set as true,
 // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
+private volatile boolean isClosed = false;
 // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
 // determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {
+this.consumer = consumerSupplier.apply(Optional.empty());
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} catch (IOException e) {
-throw new KafkaException(e);
-}
-
-Map committedOffsets = Collections.emptyMap();
-try {
-// Load committed offset 

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-06 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1285204749


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java:
##
@@ -104,6 +106,22 @@ public class RemoteLogMetadataCache {
 // https://issues.apache.org/jira/browse/KAFKA-12641
 protected final ConcurrentMap 
leaderEpochEntries = new ConcurrentHashMap<>();
 
+private final CountDownLatch initializedLatch = new CountDownLatch(1);
+
+public void markInitialized() {
+initializedLatch.countDown();
+}
+
+public void ensureInitialized() throws InterruptedException {
+if (!initializedLatch.await(2, TimeUnit.MINUTES)) {

Review Comment:
   In large clusters, we saw that it takes more than a minute to initialize. 
That's the reason we kept it 2 mins. We could also make this configurable. 
Thoughts? 
   cc @kamalcph 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-06 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1285196135


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
+// TODO - Update comments below
 // It indicates whether the closing process has been started or not. If it 
is set as true,
 // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
+private volatile boolean isClosed = false;
 // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
 // determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+private volatile boolean isAssignmentChanged = true;

Review Comment:
   This is required for the first time the ConsumerThread starts running. If 
the default value is set to false, the ConsumerTask will be busy executing the 
while loop (Line 116) without actually doing anything, until some assignment is 
received. Setting the default value to true prevents it from executing the 
while loop unnecessarily and makes it wait for an assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-06 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1285156707


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
+// TODO - Update comments below
 // It indicates whether the closing process has been started or not. If it 
is set as true,
 // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
+private volatile boolean isClosed = false;
 // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
 // determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {
+this.consumer = consumerSupplier.apply(Optional.empty());
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} catch (IOException e) {
-throw new KafkaException(e);
-}
-
-Map committedOffsets = Collections.emptyMap();
-try {
-// Load committed offset