This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6f197301646 KAFKA-9579 Fetch implementation for records in the remote
storage through a specific purgatory. (#13535)
6f197301646 is described below
commit 6f197301646135e0bb39a461ca0a07c09c3185fb
Author: Satish Duggana <[email protected]>
AuthorDate: Thu May 18 06:37:37 2023 +0530
KAFKA-9579 Fetch implementation for records in the remote storage through a
specific purgatory. (#13535)
This change includes
- Recognize the fetch requests with out of range local log offsets
- Add fetch implementation for the data lying in the range of
[logStartOffset, localLogStartOffset]
- Add a new purgatory for async remote read requests which are served
through a specific thread pool
We have an extended version of remote fetch that can fetch from multiple
remote partitions in parallel, which we will raise as a followup PR.
A few tests for the newly introduced changes are added in this PR. There
are some tests available for these scenarios in 2.8.x, refactoring with the
trunk changes, will add them in followup PRs.
Other contributors:
Kamal Chandraprakash <[email protected]> - Further improvements and
adding a few tests
Luke Chen <[email protected]> - Added a few test cases for these changes.
PS: This functionality is pulled out from internal branches with other
functionalities related to the feature in 2.8.x. The reason for not pulling all
the changes as it makes the PR huge, and burdensome to review and it also needs
other metrics, minor enhancements(including perf), and minor changes done for
tests. So, we will try to have followup PRs to cover all those.
Reviewers: Jun Rao <[email protected]>, Alexandre Dupriez
<[email protected]>, Divij Vaidya <[email protected]>, Jorge Esteban
Quilcate Otoya <[email protected]>
---
checkstyle/suppressions.xml | 1 +
.../java/kafka/log/remote/RemoteLogManager.java | 298 ++++++++++++++++++---
.../java/kafka/log/remote/RemoteLogReader.java | 70 +++++
.../server/builders/ReplicaManagerBuilder.java | 8 +
core/src/main/scala/kafka/cluster/Partition.scala | 2 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 1 +
.../src/main/scala/kafka/server/DelayedFetch.scala | 2 +-
.../scala/kafka/server/DelayedRemoteFetch.scala | 116 ++++++++
core/src/main/scala/kafka/server/KafkaServer.scala | 1 +
.../main/scala/kafka/server/ReplicaManager.scala | 259 ++++++++++++++----
.../kafka/log/remote/RemoteLogManagerTest.java | 8 +-
.../java/kafka/log/remote/RemoteLogReaderTest.java | 82 ++++++
.../kafka/server/DelayedFetchTest.scala | 2 +-
.../kafka/server/DelayedRemoteFetchTest.scala | 175 ++++++++++++
.../unit/kafka/server/ListOffsetsRequestTest.scala | 15 +-
.../kafka/server/ReplicaManagerQuotasTest.scala | 10 +-
.../kafka/storage/internals/log/FetchDataInfo.java | 10 +
.../storage/internals/log/RemoteLogReadResult.java | 30 +++
.../internals/log/RemoteStorageFetchInfo.java | 53 ++++
.../internals/log/RemoteStorageThreadPool.java | 73 +++++
20 files changed, 1107 insertions(+), 109 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 9cbb54b5cf3..2084490d187 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,6 +39,7 @@
<suppress
checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity"
files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
+ <suppress checks="NPathComplexity|ClassFanOutComplexity"
files="RemoteLogManager.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 1048cd5020f..4c07451236f 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -24,10 +24,14 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RemoteLogInputStream;
+import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.ChildFirstClassLoader;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
@@ -46,7 +50,18 @@ import
org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import
org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.EpochEntry;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.FetchIsolation;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.OffsetIndex;
+import org.apache.kafka.storage.internals.log.OffsetPosition;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
+import org.apache.kafka.storage.internals.log.TransactionIndex;
+import org.apache.kafka.storage.internals.log.TxnIndexSearchResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
@@ -65,6 +80,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
@@ -76,6 +92,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -97,6 +114,7 @@ import java.util.stream.Stream;
public class RemoteLogManager implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(RemoteLogManager.class);
+ private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX =
"remote-log-reader";
private final RemoteLogManagerConfig rlmConfig;
private final int brokerId;
@@ -109,7 +127,7 @@ public class RemoteLogManager implements Closeable {
private final RemoteLogMetadataManager remoteLogMetadataManager;
private final RemoteIndexCache indexCache;
-
+ private final RemoteStorageThreadPool remoteStorageReaderThreadPool;
private final RLMScheduledThreadPool rlmScheduledThreadPool;
private final long delayInMs;
@@ -147,6 +165,11 @@ public class RemoteLogManager implements Closeable {
indexCache = new RemoteIndexCache(1024, remoteLogStorageManager,
logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
+ remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
+ REMOTE_LOG_READER_THREAD_NAME_PREFIX,
+ rlmConfig.remoteLogReaderThreads(),
+ rlmConfig.remoteLogReaderMaxPendingTasks()
+ );
}
private <T> T createDelegate(ClassLoader classLoader, String className) {
@@ -447,7 +470,7 @@ public class RemoteLogManager implements Closeable {
leaderEpoch = -1;
}
- private void maybeUpdateReadOffset() throws RemoteStorageException {
+ private void maybeUpdateReadOffset(UnifiedLog log) throws
RemoteStorageException {
if (!copiedOffsetOption.isPresent()) {
logger.info("Find the highest remote offset for partition: {}
after becoming leader, leaderEpoch: {}", topicIdPartition, leaderEpoch);
@@ -455,23 +478,17 @@ public class RemoteLogManager implements Closeable {
// of a segment with that epoch copied into remote storage. If
it can not find an entry then it checks for the
// previous leader epoch till it finds an entry, If there are
no entries till the earliest leader epoch in leader
// epoch cache then it starts copying the segments from the
earliest epoch entry's offset.
- copiedOffsetOption =
OptionalLong.of(findHighestRemoteOffset(topicIdPartition));
+ copiedOffsetOption =
OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log));
}
}
- public void copyLogSegmentsToRemote() throws InterruptedException {
+ public void copyLogSegmentsToRemote(UnifiedLog log) throws
InterruptedException {
if (isCancelled())
return;
try {
- maybeUpdateReadOffset();
+ maybeUpdateReadOffset(log);
long copiedOffset = copiedOffsetOption.getAsLong();
- Optional<UnifiedLog> maybeLog =
fetchLog.apply(topicIdPartition.topicPartition());
- if (!maybeLog.isPresent()) {
- return;
- }
-
- UnifiedLog log = maybeLog.get();
// LSO indicates the offset below are ready to be consumed
(high-watermark or committed)
long lso = log.lastStableOffset();
@@ -578,9 +595,15 @@ public class RemoteLogManager implements Closeable {
return;
try {
+ Optional<UnifiedLog> unifiedLogOptional =
fetchLog.apply(topicIdPartition.topicPartition());
+
+ if (!unifiedLogOptional.isPresent()) {
+ return;
+ }
+
if (isLeader()) {
// Copy log segments to remote storage
- copyLogSegmentsToRemote();
+ copyLogSegmentsToRemote(unifiedLogOptional.get());
}
} catch (InterruptedException ex) {
if (!isCancelled()) {
@@ -600,25 +623,212 @@ public class RemoteLogManager implements Closeable {
}
}
- long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws
RemoteStorageException {
- Optional<Long> offset = Optional.empty();
- Optional<UnifiedLog> maybeLog =
fetchLog.apply(topicIdPartition.topicPartition());
- if (maybeLog.isPresent()) {
- UnifiedLog log = maybeLog.get();
- Option<LeaderEpochFileCache> maybeLeaderEpochFileCache =
log.leaderEpochCache();
- if (maybeLeaderEpochFileCache.isDefined()) {
- LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
- OptionalInt epoch = cache.latestEpoch();
- while (!offset.isPresent() && epoch.isPresent()) {
- offset =
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition,
epoch.getAsInt());
- epoch = cache.previousEpoch(epoch.getAsInt());
+ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo)
throws RemoteStorageException, IOException {
+ int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+ TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+ FetchRequest.PartitionData fetchInfo =
remoteStorageFetchInfo.fetchInfo;
+
+ boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation ==
FetchIsolation.TXN_COMMITTED;
+
+ long offset = fetchInfo.fetchOffset;
+ int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+ Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+ OptionalInt epoch = OptionalInt.empty();
+
+ if (logOptional.isPresent()) {
+ Option<LeaderEpochFileCache> leaderEpochCache =
logOptional.get().leaderEpochCache();
+ if (leaderEpochCache.isDefined()) {
+ epoch = leaderEpochCache.get().epochForOffset(offset);
+ }
+ }
+
+ Optional<RemoteLogSegmentMetadata> rlsMetadataOptional =
epoch.isPresent()
+ ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+ : Optional.empty();
+
+ if (!rlsMetadataOptional.isPresent()) {
+ String epochStr = (epoch.isPresent()) ?
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+ throw new OffsetOutOfRangeException("Received request for offset "
+ offset + " for leader epoch "
+ + epochStr + " and partition " + tp + " which does not
exist in remote tier.");
+ }
+
+ RemoteLogSegmentMetadata remoteLogSegmentMetadata =
rlsMetadataOptional.get();
+ int startPos = lookupPositionForOffset(remoteLogSegmentMetadata,
offset);
+ InputStream remoteSegInputStream = null;
+ try {
+ // Search forward for the position of the last offset that is
greater than or equal to the target offset
+ remoteSegInputStream =
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+ RemoteLogInputStream remoteLogInputStream = new
RemoteLogInputStream(remoteSegInputStream);
+
+ RecordBatch firstBatch = findFirstBatch(remoteLogInputStream,
offset);
+
+ if (firstBatch == null)
+ return new FetchDataInfo(new LogOffsetMetadata(offset),
MemoryRecords.EMPTY, false,
+ includeAbortedTxns ?
Optional.of(Collections.emptyList()) : Optional.empty());
+
+ int firstBatchSize = firstBatch.sizeInBytes();
+ // An empty record is sent instead of an incomplete batch when
+ // - there is no minimum-one-message constraint and
+ // - the first batch size is more than maximum bytes that can be
sent and
+ // - for FetchRequest version 3 or above.
+ if (!remoteStorageFetchInfo.minOneMessage &&
+ !remoteStorageFetchInfo.hardMaxBytesLimit &&
+ firstBatchSize > maxBytes) {
+ return new FetchDataInfo(new LogOffsetMetadata(offset),
MemoryRecords.EMPTY);
+ }
+
+ int updatedFetchSize =
+ remoteStorageFetchInfo.minOneMessage && firstBatchSize >
maxBytes ? firstBatchSize : maxBytes;
+
+ ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+ int remainingBytes = updatedFetchSize;
+
+ firstBatch.writeTo(buffer);
+ remainingBytes -= firstBatchSize;
+
+ if (remainingBytes > 0) {
+ // read the input stream until min of (EOF stream or buffer's
remaining capacity).
+ Utils.readFully(remoteSegInputStream, buffer);
+ }
+ buffer.flip();
+
+ FetchDataInfo fetchDataInfo = new FetchDataInfo(
+ new LogOffsetMetadata(offset,
remoteLogSegmentMetadata.startOffset(), startPos),
+ MemoryRecords.readableRecords(buffer));
+ if (includeAbortedTxns) {
+ fetchDataInfo =
addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata,
fetchDataInfo, logOptional.get());
+ }
+
+ return fetchDataInfo;
+ } finally {
+ Utils.closeQuietly(remoteSegInputStream,
"RemoteLogSegmentInputStream");
+ }
+ }
+
+ private int lookupPositionForOffset(RemoteLogSegmentMetadata
remoteLogSegmentMetadata, long offset) {
+ return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+ }
+
+ private FetchDataInfo addAbortedTransactions(long startOffset,
+ RemoteLogSegmentMetadata
segmentMetadata,
+ FetchDataInfo fetchInfo,
+ UnifiedLog log) throws
RemoteStorageException {
+ int fetchSize = fetchInfo.records.sizeInBytes();
+ OffsetPosition startOffsetPosition = new
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+ fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+ OffsetIndex offsetIndex =
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+ long upperBoundOffset =
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+ .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+ final Set<FetchResponseData.AbortedTransaction> abortedTransactions =
new HashSet<>();
+
+ Consumer<List<AbortedTxn>> accumulator =
+ abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+
+ collectAbortedTransactions(startOffset, upperBoundOffset,
segmentMetadata, accumulator, log);
+
+ return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+ fetchInfo.records,
+ fetchInfo.firstEntryIncomplete,
+ Optional.of(abortedTransactions.isEmpty() ?
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
+ }
+
+ private void collectAbortedTransactions(long startOffset,
+ long upperBoundOffset,
+ RemoteLogSegmentMetadata
segmentMetadata,
+ Consumer<List<AbortedTxn>>
accumulator,
+ UnifiedLog log) throws
RemoteStorageException {
+ // Search in remote segments first.
+ Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt =
Optional.of(segmentMetadata);
+ while (nextSegmentMetadataOpt.isPresent()) {
+ Optional<TransactionIndex> txnIndexOpt =
nextSegmentMetadataOpt.map(metadata ->
indexCache.getIndexEntry(metadata).txnIndex());
+ if (txnIndexOpt.isPresent()) {
+ TxnIndexSearchResult searchResult =
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+ accumulator.accept(searchResult.abortedTransactions);
+ if (searchResult.isComplete) {
+ // Return immediately when the search result is complete,
it does not need to go through local log segments.
+ return;
}
}
+
+ nextSegmentMetadataOpt =
findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
+ }
+
+ // Search in local segments
+ collectAbortedTransactionInLocalSegments(startOffset,
upperBoundOffset, accumulator,
JavaConverters.asJavaIterator(log.logSegments().iterator()));
+ }
+
+ private void collectAbortedTransactionInLocalSegments(long startOffset,
+ long
upperBoundOffset,
+
Consumer<List<AbortedTxn>> accumulator,
+ Iterator<LogSegment>
localLogSegments) {
+ while (localLogSegments.hasNext()) {
+ TransactionIndex txnIndex = localLogSegments.next().txnIndex();
+ if (txnIndex != null) {
+ TxnIndexSearchResult searchResult =
txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
+ accumulator.accept(searchResult.abortedTransactions);
+ if (searchResult.isComplete) {
+ return;
+ }
+ }
+ }
+ }
+
+ private Optional<RemoteLogSegmentMetadata>
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,
+
Option<LeaderEpochFileCache> leaderEpochFileCacheOption) throws
RemoteStorageException {
+ if (leaderEpochFileCacheOption.isEmpty()) {
+ return Optional.empty();
+ }
+
+ long nextSegmentBaseOffset = segmentMetadata.endOffset() + 1;
+ OptionalInt epoch =
leaderEpochFileCacheOption.get().epochForOffset(nextSegmentBaseOffset);
+ return epoch.isPresent()
+ ?
fetchRemoteLogSegmentMetadata(segmentMetadata.topicIdPartition().topicPartition(),
epoch.getAsInt(), nextSegmentBaseOffset)
+ : Optional.empty();
+ }
+
+ private RecordBatch findFirstBatch(RemoteLogInputStream
remoteLogInputStream, long offset) throws IOException {
+ RecordBatch nextBatch = null;
+ // Look for the batch which has the desired offset
+ // We will always have a batch in that segment as it is a
non-compacted topic.
+ do {
+ nextBatch = remoteLogInputStream.nextBatch();
+ } while (nextBatch != null && nextBatch.lastOffset() < offset);
+
+ return nextBatch;
+ }
+
+ long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog
log) throws RemoteStorageException {
+ Optional<Long> offset = Optional.empty();
+
+ Option<LeaderEpochFileCache> maybeLeaderEpochFileCache =
log.leaderEpochCache();
+ if (maybeLeaderEpochFileCache.isDefined()) {
+ LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
+ OptionalInt epoch = cache.latestEpoch();
+ while (!offset.isPresent() && epoch.isPresent()) {
+ offset =
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition,
epoch.getAsInt());
+ epoch = cache.previousEpoch(epoch.getAsInt());
+ }
}
return offset.orElse(-1L);
}
+ /**
+ * Submit a remote log read task.
+ *
+ * This method returns immediately. The read operation is executed in a
thread pool.
+ * The callback will be called when the task is done.
+ *
+ * @throws java.util.concurrent.RejectedExecutionException if the task
cannot be accepted for execution (task queue is full)
+ */
+ public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo,
Consumer<RemoteLogReadResult> callback) {
+ return remoteStorageReaderThreadPool.submit(new
RemoteLogReader(fetchInfo, this, callback));
+ }
+
void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
Consumer<RLMTask>
convertToLeaderOrFollower) {
RLMTaskWithFuture rlmTaskWithFuture =
leaderOrFollowerTasks.computeIfAbsent(topicPartition,
@@ -665,17 +875,40 @@ public class RemoteLogManager implements Closeable {
Utils.closeQuietly(remoteLogStorageManager,
"RemoteLogStorageManager");
Utils.closeQuietly(remoteLogMetadataManager,
"RemoteLogMetadataManager");
Utils.closeQuietly(indexCache, "RemoteIndexCache");
- try {
- rlmScheduledThreadPool.shutdown();
- } catch (InterruptedException e) {
- // ignore
- }
+
+ rlmScheduledThreadPool.close();
+ shutdownAndAwaitTermination(remoteStorageReaderThreadPool,
"RemoteStorageReaderThreadPool", 10, TimeUnit.SECONDS);
+
leaderOrFollowerTasks.clear();
closed = true;
}
}
}
+ private static void shutdownAndAwaitTermination(ExecutorService pool,
String poolName, long timeout, TimeUnit timeUnit) {
+ // This pattern of shutting down thread pool is adopted from here:
https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html
+ LOGGER.info("Shutting down of thread pool {} is started", poolName);
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(timeout, timeUnit)) {
+ LOGGER.info("Shutting down of thread pool {} could not be
completed. It will retry cancelling the tasks using shutdownNow.", poolName);
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(timeout, timeUnit))
+ LOGGER.warn("Shutting down of thread pool {} could not be
completed even after retrying cancellation of the tasks using shutdownNow.",
poolName);
+ }
+ } catch (InterruptedException ex) {
+ // (Re-)Cancel if current thread also interrupted
+ LOGGER.warn("Encountered InterruptedException while shutting down
thread pool {}. It will retry cancelling the tasks using shutdownNow.",
poolName);
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+
+ LOGGER.info("Shutting down of thread pool {} is completed", poolName);
+ }
+
static class RLMScheduledThreadPool {
private static final Logger LOGGER =
LoggerFactory.getLogger(RLMScheduledThreadPool.class);
@@ -708,11 +941,8 @@ public class RemoteLogManager implements Closeable {
return scheduledThreadPool.scheduleWithFixedDelay(runnable,
initialDelay, delay, timeUnit);
}
- public boolean shutdown() throws InterruptedException {
- LOGGER.info("Shutting down scheduled thread pool");
- scheduledThreadPool.shutdownNow();
- //waits for 2 mins to terminate the current tasks
- return scheduledThreadPool.awaitTermination(2, TimeUnit.MINUTES);
+ public void close() {
+ shutdownAndAwaitTermination(scheduledThreadPool,
"RLMScheduledThreadPool", 10, TimeUnit.SECONDS);
}
}
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogReader.java
b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
new file mode 100644
index 00000000000..0ed7f722d5b
--- /dev/null
+++ b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+
+public class RemoteLogReader implements Callable<Void> {
+ private final Logger logger;
+ private final RemoteStorageFetchInfo fetchInfo;
+ private final RemoteLogManager rlm;
+ private final Consumer<RemoteLogReadResult> callback;
+
+ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
+ RemoteLogManager rlm,
+ Consumer<RemoteLogReadResult> callback) {
+ this.fetchInfo = fetchInfo;
+ this.rlm = rlm;
+ this.callback = callback;
+ logger = new LogContext() {
+ @Override
+ public String logPrefix() {
+ return "[" + Thread.currentThread().getName() + "]";
+ }
+ }.logger(RemoteLogReader.class);
+ }
+
+ @Override
+ public Void call() {
+ RemoteLogReadResult result;
+ try {
+ logger.debug("Reading records from remote storage for topic
partition {}", fetchInfo.topicPartition);
+
+ FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
+ result = new RemoteLogReadResult(Optional.of(fetchDataInfo),
Optional.empty());
+ } catch (OffsetOutOfRangeException e) {
+ result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
+ } catch (Exception e) {
+ logger.error("Error occurred while reading the remote data for
{}", fetchInfo.topicPartition, e);
+ result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
+ }
+
+ logger.debug("Finished reading records from remote storage for topic
partition {}", fetchInfo.topicPartition);
+ callback.accept(result);
+
+ return null;
+ }
+}
diff --git
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 93d6f4ff3f3..5860aa17693 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -26,6 +26,7 @@ import kafka.server.DelayedElectLeader;
import kafka.server.DelayedFetch;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedProduce;
+import kafka.server.DelayedRemoteFetch;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers;
@@ -61,6 +62,7 @@ public class ReplicaManagerBuilder {
private Optional<DelayedOperationPurgatory<DelayedFetch>>
delayedFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>>
delayedDeleteRecordsPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedElectLeader>>
delayedElectLeaderPurgatory = Optional.empty();
+ private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>>
delayedRemoteFetchPurgatory = Optional.empty();
private Optional<String> threadNamePrefix = Optional.empty();
private Long brokerEpoch = -1L;
private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager =
Optional.empty();
@@ -140,6 +142,11 @@ public class ReplicaManagerBuilder {
return this;
}
+ public ReplicaManagerBuilder
setDelayedRemoteFetchPurgatory(DelayedOperationPurgatory<DelayedRemoteFetch>
delayedRemoteFetchPurgatory) {
+ this.delayedRemoteFetchPurgatory =
Optional.of(delayedRemoteFetchPurgatory);
+ return this;
+ }
+
public ReplicaManagerBuilder
setDelayedDeleteRecordsPurgatory(DelayedOperationPurgatory<DelayedDeleteRecords>
delayedDeleteRecordsPurgatory) {
this.delayedDeleteRecordsPurgatory =
Optional.of(delayedDeleteRecordsPurgatory);
return this;
@@ -189,6 +196,7 @@ public class ReplicaManagerBuilder {
OptionConverters.toScala(delayedFetchPurgatory),
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
OptionConverters.toScala(delayedElectLeaderPurgatory),
+
OptionConverters.toScala(delayedRemoteFetchPurgatory),
OptionConverters.toScala(threadNamePrefix),
() -> brokerEpoch,
OptionConverters.toScala(addPartitionsToTxnManager));
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 0f37d292385..413373d65b0 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -526,7 +526,7 @@ class Partition(val topicPartition: TopicPartition,
leaderReplicaIdOpt.filter(_ == localBrokerId)
}
- private def localLogWithEpochOrThrow(
+ def localLogWithEpochOrThrow(
currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean
): UnifiedLog = {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 9452f0afd93..d48a7da30aa 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -270,6 +270,7 @@ class BrokerServer(
isShuttingDown = isShuttingDown,
zkClient = None,
threadNamePrefix = None, // The ReplicaManager only runs on the
broker, and already includes the ID in thread names.
+ delayedRemoteFetchPurgatoryParam = None,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
addPartitionsToTxnManager = Some(addPartitionsToTxnManager)
)
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 423b7bcd223..9ce6082e76c 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -163,7 +163,7 @@ class DelayedFetch(
tp -> status.fetchInfo
}
- val logReadResults = replicaManager.readFromLocalLog(
+ val logReadResults = replicaManager.readFromLog(
params,
fetchInfos,
quota,
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
new file mode 100644
index 00000000000..5fcf851fe83
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.TopicIdPartition
+import org.apache.kafka.common.errors._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.storage.internals.log.{FetchParams,
FetchPartitionData, LogOffsetMetadata, RemoteLogReadResult,
RemoteStorageFetchInfo}
+
+import java.util.concurrent.{CompletableFuture, Future}
+import java.util.{Optional, OptionalInt, OptionalLong}
+import scala.collection._
+
+/**
+ * A remote fetch operation that can be created by the replica manager and
watched
+ * in the remote fetch operation purgatory
+ */
+class DelayedRemoteFetch(remoteFetchTask: Future[Void],
+ remoteFetchResult:
CompletableFuture[RemoteLogReadResult],
+ remoteFetchInfo: RemoteStorageFetchInfo,
+ fetchPartitionStatus: Seq[(TopicIdPartition,
FetchPartitionStatus)],
+ fetchParams: FetchParams,
+ localReadResults: Seq[(TopicIdPartition,
LogReadResult)],
+ replicaManager: ReplicaManager,
+ responseCallback: Seq[(TopicIdPartition,
FetchPartitionData)] => Unit)
+ extends DelayedOperation(fetchParams.maxWaitMs) {
+
+ /**
+ * The operation can be completed if:
+ *
+ * Case a: This broker is no longer the leader of the partition it tries to
fetch
+ * Case b: This broker does not know the partition it tries to fetch
+ * Case c: The remote storage read request completed (succeeded or failed)
+ * Case d: The partition is in an offline log directory on this broker
+ *
+ * Upon completion, should return whatever data is available for each valid
partition
+ */
+ override def tryComplete(): Boolean = {
+ fetchPartitionStatus.foreach {
+ case (topicPartition, fetchStatus) =>
+ val fetchOffset = fetchStatus.startOffsetMetadata
+ try {
+ if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
+
replicaManager.getPartitionOrException(topicPartition.topicPartition())
+ }
+ } catch {
+ case _: KafkaStorageException => // Case d
+ debug(s"Partition $topicPartition is in an offline log directory,
satisfy $fetchParams immediately")
+ return forceComplete()
+ case _: UnknownTopicOrPartitionException => // Case b
+ debug(s"Broker no longer knows of partition $topicPartition,
satisfy $fetchParams immediately")
+ return forceComplete()
+ case _: NotLeaderOrFollowerException => // Case a
+ debug("Broker is no longer the leader or follower of %s, satisfy
%s immediately".format(topicPartition, fetchParams))
+ return forceComplete()
+ }
+ }
+ if (remoteFetchResult.isDone) // Case c
+ forceComplete()
+ else
+ false
+ }
+
+ override def onExpiration(): Unit = {
+ // cancel the remote storage read task, if it has not been executed yet
+ val cancelled = remoteFetchTask.cancel(true)
+ if (!cancelled) debug(s"Remote fetch task for for RemoteStorageFetchInfo:
$remoteFetchInfo could not be cancelled and its isDone value is
${remoteFetchTask.isDone}")
+ }
+
+ /**
+ * Upon completion, read whatever data is available and pass to the complete
callback
+ */
+ override def onComplete(): Unit = {
+ val fetchPartitionData = localReadResults.map { case (tp, result) =>
+ if (tp.topicPartition().equals(remoteFetchInfo.topicPartition)
+ && remoteFetchResult.isDone
+ && result.error == Errors.NONE
+ && result.info.delayedRemoteStorageFetch.isPresent) {
+ if (remoteFetchResult.get.error.isPresent) {
+ tp ->
ReplicaManager.createLogReadResult(remoteFetchResult.get.error.get).toFetchPartitionData(false)
+ } else {
+ val info = remoteFetchResult.get.fetchDataInfo.get
+ tp -> new FetchPartitionData(
+ result.error,
+ result.highWatermark,
+ result.leaderLogStartOffset,
+ info.records,
+ Optional.empty(),
+ if (result.lastStableOffset.isDefined)
OptionalLong.of(result.lastStableOffset.get) else OptionalLong.empty(),
+ info.abortedTransactions,
+ if (result.preferredReadReplica.isDefined)
OptionalInt.of(result.preferredReadReplica.get) else OptionalInt.empty(),
+ false)
+ }
+ } else {
+ tp -> result.toFetchPartitionData(false)
+ }
+ }
+
+ responseCallback(fetchPartitionData)
+ }
+}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7a3d2f225b6..261659747ef 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -629,6 +629,7 @@ class KafkaServer(
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
zkClient = Some(zkClient),
+ delayedRemoteFetchPurgatoryParam = None,
threadNamePrefix = threadNamePrefix,
brokerEpochSupplier = brokerEpochSupplier,
addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 92ddb59ed57..0027fe77afe 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -24,6 +24,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.ReplicaManager.createLogReadResult
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile,
OffsetCheckpoints}
import kafka.server.metadata.ZkMetadataCache
import kafka.utils.Implicits._
@@ -55,16 +56,16 @@ import org.apache.kafka.common.{ElectionType,
IsolationLevel, Node, TopicIdParti
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common.MetadataVersion._
-import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig,
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig,
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo,
RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo}
import java.io.File
import java.nio.file.{Files, Paths}
import java.util
-import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
+import java.util.concurrent.{CompletableFuture, Future,
RejectedExecutionException, TimeUnit}
import java.util.{Optional, OptionalInt, OptionalLong}
import scala.collection.{Map, Seq, Set, mutable}
import scala.compat.java8.OptionConverters._
@@ -175,6 +176,33 @@ object HostedPartition {
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
+
+ def createLogReadResult(highWatermark: Long,
+ leaderLogStartOffset: Long,
+ leaderLogEndOffset: Long,
+ e: Throwable) = {
+ LogReadResult(info = new
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
+ divergingEpoch = None,
+ highWatermark,
+ leaderLogStartOffset,
+ leaderLogEndOffset,
+ followerLogStartOffset = -1L,
+ fetchTimeMs = -1L,
+ lastStableOffset = None,
+ exception = Some(e))
+ }
+
+ def createLogReadResult(e: Throwable): LogReadResult = {
+ LogReadResult(info = new
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
+ divergingEpoch = None,
+ highWatermark = UnifiedLog.UnknownOffset,
+ leaderLogStartOffset = UnifiedLog.UnknownOffset,
+ leaderLogEndOffset = UnifiedLog.UnknownOffset,
+ followerLogStartOffset = UnifiedLog.UnknownOffset,
+ fetchTimeMs = -1L,
+ lastStableOffset = None,
+ exception = Some(e))
+ }
}
class ReplicaManager(val config: KafkaConfig,
@@ -194,6 +222,7 @@ class ReplicaManager(val config: KafkaConfig,
delayedFetchPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedFetch]] = None,
delayedDeleteRecordsPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
delayedElectLeaderPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
+ delayedRemoteFetchPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
threadNamePrefix: Option[String] = None,
val brokerEpochSupplier: () => Long = () => -1,
addPartitionsToTxnManager:
Option[AddPartitionsToTxnManager] = None
@@ -215,6 +244,9 @@ class ReplicaManager(val config: KafkaConfig,
val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "ElectLeader", brokerId = config.brokerId))
+ val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
+ DelayedOperationPurgatory[DelayedRemoteFetch](
+ purgatoryName = "RemoteFetch", brokerId = config.brokerId))
/* epoch of the controller that last changed the leader */
@volatile private[server] var controllerEpoch: Int =
KafkaController.InitialControllerEpoch
@@ -330,6 +362,7 @@ class ReplicaManager(val config: KafkaConfig,
val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
+ delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
}
/**
@@ -610,7 +643,7 @@ class ReplicaManager(val config: KafkaConfig,
* Noted that all pending delayed check operations are stored in a queue.
All callers to ReplicaManager.appendRecords()
* are expected to call ActionQueue.tryCompleteActions for all affected
partitions, without holding any conflicting
* locks.
- *
+ *
* @param timeout maximum time we will wait to append
before returning
* @param requiredAcks number of replicas who must
acknowledge the append before sending the response
* @param internalTopicsAllowed boolean indicating whether internal
topics can be appended to
@@ -638,7 +671,7 @@ class ReplicaManager(val config: KafkaConfig,
val sTime = time.milliseconds
val transactionalProducerIds = mutable.HashSet[Long]()
- val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition) =
+ val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition) =
if (transactionStatePartition.isEmpty ||
!config.transactionPartitionVerificationEnable)
(entriesPerPartition, Map.empty)
else {
@@ -648,7 +681,7 @@ class ReplicaManager(val config: KafkaConfig,
transactionalBatches.foreach(batch =>
transactionalProducerIds.add(batch.producerId))
if (transactionalBatches.nonEmpty) {
getPartitionOrException(topicPartition).hasOngoingTransaction(transactionalBatches.head.producerId)
- } else {
+ } else {
// If there is no producer ID in the batches, no need to verify.
true
}
@@ -1121,21 +1154,69 @@ class ReplicaManager(val config: KafkaConfig,
partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets,
isFromConsumer, fetchOnlyFromLeader)
}
+ /**
+ * Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo
could not be scheduled successfully
+ * else returns [[None]].
+ */
+ private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo,
+ params: FetchParams,
+ responseCallback: Seq[(TopicIdPartition,
FetchPartitionData)] => Unit,
+ logReadResults: Seq[(TopicIdPartition,
LogReadResult)],
+ fetchPartitionStatus: Seq[(TopicIdPartition,
FetchPartitionStatus)]): Option[LogReadResult] = {
+ val key = new
TopicPartitionOperationKey(remoteFetchInfo.topicPartition.topic(),
remoteFetchInfo.topicPartition.partition())
+ val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
+ var remoteFetchTask: Future[Void] = null
+ try {
+ remoteFetchTask = remoteLogManager.get.asyncRead(remoteFetchInfo,
(result: RemoteLogReadResult) => {
+ remoteFetchResult.complete(result)
+ delayedRemoteFetchPurgatory.checkAndComplete(key)
+ })
+ } catch {
+ case e: RejectedExecutionException =>
+ // Return the error if any in scheduling the remote fetch task
+ return Some(createLogReadResult(e))
+ }
+
+ val remoteFetch = new DelayedRemoteFetch(remoteFetchTask,
remoteFetchResult, remoteFetchInfo,
+ fetchPartitionStatus, params, logReadResults, this, responseCallback)
+
+ delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
+ None
+ }
+
+ private def buildPartitionToFetchPartitionData(logReadResults:
Seq[(TopicIdPartition, LogReadResult)],
+ remoteFetchTopicPartition:
TopicPartition,
+ error: LogReadResult):
Seq[(TopicIdPartition, FetchPartitionData)] = {
+ logReadResults.map { case (tp, result) =>
+ val fetchPartitionData = {
+ if (tp.topicPartition().equals(remoteFetchTopicPartition))
+ error
+ else
+ result
+ }.toFetchPartitionData(false)
+
+ tp -> fetchPartitionData
+ }
+ }
+
/**
* Fetch messages from a replica, and wait until enough data can be fetched
and return;
* the callback function will be triggered either when timeout or required
fetch info is satisfied.
* Consumers may fetch from any replica, but followers can only fetch from
the leader.
*/
- def fetchMessages(
- params: FetchParams,
- fetchInfos: Seq[(TopicIdPartition, PartitionData)],
- quota: ReplicaQuota,
- responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
- ): Unit = {
+ def fetchMessages(params: FetchParams,
+ fetchInfos: Seq[(TopicIdPartition, PartitionData)],
+ quota: ReplicaQuota,
+ responseCallback: Seq[(TopicIdPartition,
FetchPartitionData)] => Unit): Unit = {
+
// check if this fetch request can be satisfied right away
- val logReadResults = readFromLocalLog(params, fetchInfos, quota,
readFromPurgatory = false)
+ val logReadResults = readFromLog(params, fetchInfos, quota,
readFromPurgatory = false)
var bytesReadable: Long = 0
var errorReadingData = false
+
+ // The 1st topic-partition that has to be read from remote storage
+ var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()
+
var hasDivergingEpoch = false
var hasPreferredReadReplica = false
val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
@@ -1145,6 +1226,9 @@ class ReplicaManager(val config: KafkaConfig,
brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
if (logReadResult.error != Errors.NONE)
errorReadingData = true
+ if (!remoteFetchInfo.isPresent &&
logReadResult.info.delayedRemoteStorageFetch.isPresent) {
+ remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch
+ }
if (logReadResult.divergingEpoch.nonEmpty)
hasDivergingEpoch = true
if (logReadResult.preferredReadReplica.nonEmpty)
@@ -1153,14 +1237,15 @@ class ReplicaManager(val config: KafkaConfig,
logReadResultMap.put(topicIdPartition, logReadResult)
}
- // respond immediately if 1) fetch request does not want to wait
+ // Respond immediately if no remote fetches are required and any of the
below conditions is true
+ // 1) fetch request does not want to wait
// 2) fetch request does not require any data
// 3) has enough data to respond
// 4) some error happens while reading data
// 5) we found a diverging epoch
// 6) has a preferred read replica
- if (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >=
params.minBytes || errorReadingData ||
- hasDivergingEpoch || hasPreferredReadReplica) {
+ if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 ||
fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
+ hasDivergingEpoch || hasPreferredReadReplica)) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
val isReassignmentFetch = params.isFromFollower &&
isAddingReplica(tp.topicPartition, params.replicaId)
tp -> result.toFetchPartitionData(isReassignmentFetch)
@@ -1175,49 +1260,77 @@ class ReplicaManager(val config: KafkaConfig,
fetchPartitionStatus += (topicIdPartition ->
FetchPartitionStatus(logOffsetMetadata, partitionData))
})
}
- val delayedFetch = new DelayedFetch(
- params = params,
- fetchPartitionStatus = fetchPartitionStatus,
- replicaManager = this,
- quota = quota,
- responseCallback = responseCallback
- )
-
- // create a list of (topic, partition) pairs to use as keys for this
delayed fetch operation
- val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) =>
TopicPartitionOperationKey(tp) }
-
- // try to complete the request immediately, otherwise put it into the
purgatory;
- // this is because while the delayed fetch operation is being created,
new requests
- // may arrive and hence make this operation completable.
- delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch,
delayedFetchKeys)
+
+ if (remoteFetchInfo.isPresent) {
+ val maybeLogReadResultWithError =
processRemoteFetch(remoteFetchInfo.get(), params, responseCallback,
logReadResults, fetchPartitionStatus)
+ if (maybeLogReadResultWithError.isDefined) {
+ // If there is an error in scheduling the remote fetch task, return
what we currently have
+ // (the data read from local log segment for the other
topic-partitions) and an error for the topic-partition
+ // that we couldn't read from remote storage
+ val partitionToFetchPartitionData =
buildPartitionToFetchPartitionData(logReadResults,
remoteFetchInfo.get().topicPartition, maybeLogReadResultWithError.get)
+ responseCallback(partitionToFetchPartitionData)
+ }
+ } else {
+ // If there is not enough data to respond and there is no remote data,
we will let the fetch request
+ // wait for new data.
+ val delayedFetch = new DelayedFetch(
+ params = params,
+ fetchPartitionStatus = fetchPartitionStatus,
+ replicaManager = this,
+ quota = quota,
+ responseCallback = responseCallback
+ )
+
+ // create a list of (topic, partition) pairs to use as keys for this
delayed fetch operation
+ val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) =>
TopicPartitionOperationKey(tp) }
+
+ // try to complete the request immediately, otherwise put it into the
purgatory;
+ // this is because while the delayed fetch operation is being created,
new requests
+ // may arrive and hence make this operation completable.
+ delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch,
delayedFetchKeys)
+ }
}
}
/**
* Read from multiple topic partitions at the given offset up to maxSize
bytes
*/
- def readFromLocalLog(
+ def readFromLog(
params: FetchParams,
readPartitionInfo: Seq[(TopicIdPartition, PartitionData)],
quota: ReplicaQuota,
- readFromPurgatory: Boolean
- ): Seq[(TopicIdPartition, LogReadResult)] = {
+ readFromPurgatory: Boolean): Seq[(TopicIdPartition, LogReadResult)] = {
val traceEnabled = isTraceEnabled
+ def checkFetchDataInfo(partition: Partition, givenFetchedDataInfo:
FetchDataInfo) = {
+ if (params.isFromFollower && shouldLeaderThrottle(quota, partition,
params.replicaId)) {
+ // If the partition is being throttled, simply return an empty set.
+ new FetchDataInfo(givenFetchedDataInfo.fetchOffsetMetadata,
MemoryRecords.EMPTY)
+ } else if (!params.hardMaxBytesLimit &&
givenFetchedDataInfo.firstEntryIncomplete) {
+ // For FetchRequest version 3, we replace incomplete message sets with
an empty one as consumers can make
+ // progress in such cases and don't need to report a
`RecordTooLargeException`
+ new FetchDataInfo(givenFetchedDataInfo.fetchOffsetMetadata,
MemoryRecords.EMPTY)
+ } else {
+ givenFetchedDataInfo
+ }
+ }
+
def read(tp: TopicIdPartition, fetchInfo: PartitionData, limitBytes: Int,
minOneMessage: Boolean): LogReadResult = {
val offset = fetchInfo.fetchOffset
val partitionFetchSize = fetchInfo.maxBytes
val followerLogStartOffset = fetchInfo.logStartOffset
val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
+ var log: UnifiedLog = null
+ var partition : Partition = null
+ val fetchTimeMs = time.milliseconds
try {
if (traceEnabled)
trace(s"Fetching log segment for partition $tp, offset $offset,
partition fetch size $partitionFetchSize, " +
s"remaining response limit $limitBytes" +
(if (minOneMessage) s", ignoring response/partition size limits"
else ""))
- val partition = getPartitionOrException(tp.topicPartition)
- val fetchTimeMs = time.milliseconds
+ partition = getPartitionOrException(tp.topicPartition)
// Check if topic ID from the fetch request/session matches the ID in
the log
val topicId = if (tp.topicId == Uuid.ZERO_UUID) None else
Some(tp.topicId)
@@ -1246,6 +1359,8 @@ class ReplicaManager(val config: KafkaConfig,
preferredReadReplica = preferredReadReplica,
exception = None)
} else {
+ log =
partition.localLogWithEpochOrThrow(fetchInfo.currentLeaderEpoch,
params.fetchOnlyLeader())
+
// Try the read first, this tells us whether we need all of
adjustedFetchSize for this partition
val readInfo: LogReadInfo = partition.fetchRecords(
fetchParams = params,
@@ -1253,19 +1368,9 @@ class ReplicaManager(val config: KafkaConfig,
fetchTimeMs = fetchTimeMs,
maxBytes = adjustedMaxBytes,
minOneMessage = minOneMessage,
- updateFetchState = !readFromPurgatory
- )
+ updateFetchState = !readFromPurgatory)
- val fetchDataInfo = if (params.isFromFollower &&
shouldLeaderThrottle(quota, partition, params.replicaId)) {
- // If the partition is being throttled, simply return an empty set.
- new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata,
MemoryRecords.EMPTY)
- } else if (!params.hardMaxBytesLimit &&
readInfo.fetchedData.firstEntryIncomplete) {
- // For FetchRequest version 3, we replace incomplete message sets
with an empty one as consumers can make
- // progress in such cases and don't need to report a
`RecordTooLargeException`
- new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata,
MemoryRecords.EMPTY)
- } else {
- readInfo.fetchedData
- }
+ val fetchDataInfo = checkFetchDataInfo(partition,
readInfo.fetchedData)
LogReadResult(info = fetchDataInfo,
divergingEpoch = readInfo.divergingEpoch.asScala,
@@ -1288,17 +1393,10 @@ class ReplicaManager(val config: KafkaConfig,
_: FencedLeaderEpochException |
_: ReplicaNotAvailableException |
_: KafkaStorageException |
- _: OffsetOutOfRangeException |
_: InconsistentTopicIdException) =>
- LogReadResult(info = new
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
- divergingEpoch = None,
- highWatermark = UnifiedLog.UnknownOffset,
- leaderLogStartOffset = UnifiedLog.UnknownOffset,
- leaderLogEndOffset = UnifiedLog.UnknownOffset,
- followerLogStartOffset = UnifiedLog.UnknownOffset,
- fetchTimeMs = -1L,
- lastStableOffset = None,
- exception = Some(e))
+ createLogReadResult(e)
+ case e: OffsetOutOfRangeException =>
+ handleOffsetOutOfRangeError(tp, params, fetchInfo, adjustedMaxBytes,
minOneMessage, log, fetchTimeMs, e)
case e: Throwable =>
brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
@@ -1335,6 +1433,50 @@ class ReplicaManager(val config: KafkaConfig,
result
}
+ private def handleOffsetOutOfRangeError(tp: TopicIdPartition, params:
FetchParams, fetchInfo: PartitionData,
+ adjustedMaxBytes: Int, minOneMessage:
+ Boolean, log: UnifiedLog,
fetchTimeMs: Long,
+ exception:
OffsetOutOfRangeException): LogReadResult = {
+ val offset = fetchInfo.fetchOffset
+ // In case of offset out of range errors, handle it for tiered storage
only if all the below conditions are true.
+ // 1) remote log manager is enabled and it is available
+ // 2) `log` instance should not be null here as that would have been
caught earlier with NotLeaderForPartitionException or
ReplicaNotAvailableException.
+ // 3) fetch offset is within the offset range of the remote storage layer
+ if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() &&
+ log.logStartOffset <= offset && offset < log.localLogStartOffset())
+ {
+ val highWatermark = log.highWatermark
+ val leaderLogStartOffset = log.logStartOffset
+ val leaderLogEndOffset = log.logEndOffset
+
+ if (params.isFromFollower) {
+ // If it is from a follower then send the offset metadata only as the
data is already available in remote
+ // storage and throw an error saying that this offset is moved to
tiered storage.
+ createLogReadResult(highWatermark, leaderLogStartOffset,
leaderLogEndOffset,
+ new OffsetMovedToTieredStorageException("Given offset" + offset + "
is moved to tiered storage"))
+ } else {
+ // For consume fetch requests, create a dummy FetchDataInfo with the
remote storage fetch information.
+ // For the first topic-partition that needs remote data, we will use
this information to read the data in another thread.
+ val fetchDataInfo =
+ new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY,
false, Optional.empty(),
+ Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes,
minOneMessage, tp.topicPartition(),
+ fetchInfo, params.isolation, params.hardMaxBytesLimit())))
+
+ LogReadResult(fetchDataInfo,
+ divergingEpoch = None,
+ highWatermark,
+ leaderLogStartOffset,
+ leaderLogEndOffset,
+ fetchInfo.logStartOffset,
+ fetchTimeMs,
+ Some(log.lastStableOffset),
+ exception = None)
+ }
+ } else {
+ createLogReadResult(exception)
+ }
+ }
+
/**
* Using the configured [[ReplicaSelector]], determine the preferred read
replica for a partition given the
* client metadata, the requested offset, and the current set of replicas.
If the preferred read replica is the
@@ -2045,6 +2187,7 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.shutdown()
replicaAlterLogDirsManager.shutdown()
delayedFetchPurgatory.shutdown()
+ delayedRemoteFetchPurgatory.shutdown()
delayedProducePurgatory.shutdown()
delayedDeleteRecordsPurgatory.shutdown()
delayedElectLeaderPurgatory.shutdown()
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index c4cfc00b57b..9b9ad697eb8 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -167,11 +167,11 @@ public class RemoteLogManagerTest {
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
- long offset = remoteLogManager.findHighestRemoteOffset(tpId);
+ long offset = remoteLogManager.findHighestRemoteOffset(tpId, mockLog);
assertEquals(-1, offset);
when(remoteLogMetadataManager.highestOffsetForEpoch(tpId,
2)).thenReturn(Optional.of(200L));
- long offset2 = remoteLogManager.findHighestRemoteOffset(tpId);
+ long offset2 = remoteLogManager.findHighestRemoteOffset(tpId, mockLog);
assertEquals(200, offset2);
}
@@ -261,7 +261,7 @@ public class RemoteLogManagerTest {
RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition);
task.convertToLeader(2);
- task.copyLogSegmentsToRemote();
+ task.copyLogSegmentsToRemote(mockLog);
// verify remoteLogMetadataManager did add the expected
RemoteLogSegmentMetadata
ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg =
ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
@@ -318,7 +318,7 @@ public class RemoteLogManagerTest {
RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition);
task.convertToFollower();
- task.copyLogSegmentsToRemote();
+ task.copyLogSegmentsToRemote(mockLog);
// verify the remoteLogMetadataManager never add any metadata and
remoteStorageManager never copy log segments
verify(remoteLogMetadataManager,
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
new file mode 100644
index 00000000000..aa8dd042a13
--- /dev/null
+++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogReaderTest {
+ RemoteLogManager mockRLM = mock(RemoteLogManager.class);
+ LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
+ Records records = mock(Records.class);
+
+ @Test
+ public void testRemoteLogReaderWithoutError() throws
RemoteStorageException, IOException {
+ FetchDataInfo fetchDataInfo = new FetchDataInfo(logOffsetMetadata,
records);
+
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo);
+
+ Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
+ RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null,
false);
+ RemoteLogReader remoteLogReader = new
RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback);
+ remoteLogReader.call();
+
+ // verify the callback did get invoked with the expected
remoteLogReadResult
+ ArgumentCaptor<RemoteLogReadResult> remoteLogReadResultArg =
ArgumentCaptor.forClass(RemoteLogReadResult.class);
+ verify(callback, times(1)).accept(remoteLogReadResultArg.capture());
+ RemoteLogReadResult actualRemoteLogReadResult =
remoteLogReadResultArg.getValue();
+ assertFalse(actualRemoteLogReadResult.error.isPresent());
+ assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent());
+ assertEquals(fetchDataInfo,
actualRemoteLogReadResult.fetchDataInfo.get());
+ }
+
+ @Test
+ public void testRemoteLogReaderWithError() throws RemoteStorageException,
IOException {
+ when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new
OffsetOutOfRangeException("error"));
+
+ Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
+ RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null,
false);
+ RemoteLogReader remoteLogReader = new
RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback);
+ remoteLogReader.call();
+
+ // verify the callback did get invoked with the expected
remoteLogReadResult
+ ArgumentCaptor<RemoteLogReadResult> remoteLogReadResultArg =
ArgumentCaptor.forClass(RemoteLogReadResult.class);
+ verify(callback, times(1)).accept(remoteLogReadResultArg.capture());
+ RemoteLogReadResult actualRemoteLogReadResult =
remoteLogReadResultArg.getValue();
+ assertTrue(actualRemoteLogReadResult.error.isPresent());
+ assertFalse(actualRemoteLogReadResult.fetchDataInfo.isPresent());
+ }
+}
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 1710d713d0f..f26f7079d4b 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -188,7 +188,7 @@ class DelayedFetchTest {
fetchPartitionData: FetchRequest.PartitionData,
error: Errors
): Unit = {
- when(replicaManager.readFromLocalLog(
+ when(replicaManager.readFromLog(
fetchParams,
readPartitionInfo = Seq((topicIdPartition, fetchPartitionData)),
quota = replicaQuota,
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
new file mode 100644
index 00000000000..007154f46de
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.cluster.Partition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.storage.internals.log._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.{mock, when}
+
+import java.util.Optional
+import java.util.concurrent.CompletableFuture
+
+import scala.collection._
+
+class DelayedRemoteFetchTest {
+ private val maxBytes = 1024
+ private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+ private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0,
"topic")
+ private val fetchOffset = 500L
+ private val logStartOffset = 0L
+ private val currentLeaderEpoch = Optional.of[Integer](10)
+ private val replicaId = 1
+
+ private val fetchStatus = FetchPartitionStatus(
+ startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+ fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch))
+ private val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs =
500)
+
+ @Test
+ def testFetch(): Unit = {
+ var actualTopicPartition: Option[TopicIdPartition] = None
+ var fetchResultOpt: Option[FetchPartitionData] = None
+
+ def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit
= {
+ assertEquals(1, responses.size)
+ actualTopicPartition = Some(responses.head._1)
+ fetchResultOpt = Some(responses.head._2)
+ }
+
+ val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
+ future.complete(null)
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null, false)
+ val highWatermark = 100
+ val leaderLogStartOffset = 10
+ val logReadInfo = buildReadResult(Errors.NONE, highWatermark,
leaderLogStartOffset)
+
+ val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo,
Seq(topicIdPartition -> fetchStatus), fetchParams,
+ Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+ .thenReturn(mock(classOf[Partition]))
+
+ assertTrue(delayedRemoteFetch.tryComplete())
+ assertTrue(delayedRemoteFetch.isCompleted)
+ assertTrue(actualTopicPartition.isDefined)
+ assertEquals(topicIdPartition, actualTopicPartition.get)
+ assertTrue(fetchResultOpt.isDefined)
+
+ val fetchResult = fetchResultOpt.get
+ assertEquals(Errors.NONE, fetchResult.error)
+ assertEquals(highWatermark, fetchResult.highWatermark)
+ assertEquals(leaderLogStartOffset, fetchResult.logStartOffset)
+ }
+
+ @Test
+ def testNotLeaderOrFollower(): Unit = {
+ var actualTopicPartition: Option[TopicIdPartition] = None
+ var fetchResultOpt: Option[FetchPartitionData] = None
+
+ def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit
= {
+ assertEquals(1, responses.size)
+ actualTopicPartition = Some(responses.head._1)
+ fetchResultOpt = Some(responses.head._2)
+ }
+
+ // throw exception while getPartition
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+ .thenThrow(new NotLeaderOrFollowerException(s"Replica for
$topicIdPartition not available"))
+
+ val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null, false)
+
+ val logReadInfo = buildReadResult(Errors.NONE)
+
+ val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo,
Seq(topicIdPartition -> fetchStatus), fetchParams,
+ Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+
+ // delayed remote fetch should still be able to complete
+ assertTrue(delayedRemoteFetch.tryComplete())
+ assertTrue(delayedRemoteFetch.isCompleted)
+ assertEquals(topicIdPartition, actualTopicPartition.get)
+ assertTrue(fetchResultOpt.isDefined)
+ }
+
+ @Test
+ def testErrorLogReadInfo(): Unit = {
+ var actualTopicPartition: Option[TopicIdPartition] = None
+ var fetchResultOpt: Option[FetchPartitionData] = None
+
+ def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit
= {
+ assertEquals(1, responses.size)
+ actualTopicPartition = Some(responses.head._1)
+ fetchResultOpt = Some(responses.head._2)
+ }
+
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+ .thenReturn(mock(classOf[Partition]))
+
+ val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
+ future.complete(null)
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null, false)
+
+ // build a read result with error
+ val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
+
+ val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo,
Seq(topicIdPartition -> fetchStatus), fetchParams,
+ Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+
+ assertTrue(delayedRemoteFetch.tryComplete())
+ assertTrue(delayedRemoteFetch.isCompleted)
+ assertEquals(topicIdPartition, actualTopicPartition.get)
+ assertTrue(fetchResultOpt.isDefined)
+ assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResultOpt.get.error)
+ }
+
+ private def buildFollowerFetchParams(replicaId: Int,
+ maxWaitMs: Int): FetchParams = {
+ new FetchParams(
+ ApiKeys.FETCH.latestVersion,
+ replicaId,
+ 1,
+ maxWaitMs,
+ 1,
+ maxBytes,
+ FetchIsolation.LOG_END,
+ Optional.empty()
+ )
+ }
+
+ private def buildReadResult(error: Errors,
+ highWatermark: Int = 0,
+ leaderLogStartOffset: Int = 0): LogReadResult = {
+ LogReadResult(
+ exception = if (error != Errors.NONE) Some(error.exception) else None,
+ info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
MemoryRecords.EMPTY),
+ divergingEpoch = None,
+ highWatermark = highWatermark,
+ leaderLogStartOffset = leaderLogStartOffset,
+ leaderLogEndOffset = -1L,
+ followerLogStartOffset = -1L,
+ fetchTimeMs = -1L,
+ lastStableOffset = None)
+ }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 654ffb5ae8d..5419557b1e7 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -158,15 +158,20 @@ class ListOffsetsRequestTest extends BaseRequestTest {
private[this] def fetchOffsetAndEpoch(serverId: Int,
timestamp: Long,
version: Short): (Long, Int) = {
+ val (offset, leaderEpoch, _) = fetchOffsetAndEpochWithError(serverId,
timestamp, version)
+ (offset, leaderEpoch)
+ }
+
+ private[this] def fetchOffsetAndEpochWithError(serverId: Int, timestamp:
Long, version: Short): (Long, Int, Short) = {
val partitionData = sendRequest(serverId, timestamp, version)
if (version == 0) {
if (partitionData.oldStyleOffsets().isEmpty)
- (-1, partitionData.leaderEpoch)
+ (-1, partitionData.leaderEpoch, partitionData.errorCode())
else
- (partitionData.oldStyleOffsets().asScala.head,
partitionData.leaderEpoch)
+ (partitionData.oldStyleOffsets().asScala.head,
partitionData.leaderEpoch, partitionData.errorCode())
} else
- (partitionData.offset, partitionData.leaderEpoch)
+ (partitionData.offset, partitionData.leaderEpoch,
partitionData.errorCode())
}
@Test
@@ -202,8 +207,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId,
ListOffsetsRequest.EARLIEST_TIMESTAMP, -1))
// The latest offset reflects the updated epoch
- assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId,
ListOffsetsRequest.LATEST_TIMESTAMP, -1))
- assertEquals((9L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId,
ListOffsetsRequest.MAX_TIMESTAMP, -1))
+ assertEquals((10L, secondLeaderEpoch, Errors.NONE.code()),
fetchOffsetAndEpochWithError(secondLeaderId,
ListOffsetsRequest.LATEST_TIMESTAMP, -1))
+ assertEquals((9L, secondLeaderEpoch, Errors.NONE.code()),
fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP,
-1))
}
@Test
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index ce727f83c74..4e35084d3bd 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -67,7 +67,7 @@ class ReplicaManagerQuotasTest {
.thenReturn(true)
val fetchParams = PartitionTest.followerFetchParams(followerReplicaId)
- val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota,
readFromPurgatory = false)
+ val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota,
readFromPurgatory = false)
assertEquals(1, fetch.find(_._1 ==
topicIdPartition1).get._2.info.records.batches.asScala.size,
"Given two partitions, with only one throttled, we should get the first")
assertEquals(0, fetch.find(_._1 ==
topicIdPartition2).get._2.info.records.batches.asScala.size,
@@ -85,7 +85,7 @@ class ReplicaManagerQuotasTest {
.thenReturn(true)
val fetchParams = PartitionTest.followerFetchParams(followerReplicaId)
- val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota,
readFromPurgatory = false)
+ val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota,
readFromPurgatory = false)
assertEquals(0, fetch.find(_._1 ==
topicIdPartition1).get._2.info.records.batches.asScala.size,
"Given two partitions, with both throttled, we should get no messages")
assertEquals(0, fetch.find(_._1 ==
topicIdPartition2).get._2.info.records.batches.asScala.size,
@@ -103,7 +103,7 @@ class ReplicaManagerQuotasTest {
.thenReturn(false)
val fetchParams = PartitionTest.followerFetchParams(followerReplicaId)
- val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota,
readFromPurgatory = false)
+ val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota,
readFromPurgatory = false)
assertEquals(1, fetch.find(_._1 ==
topicIdPartition1).get._2.info.records.batches.asScala.size,
"Given two partitions, with both non-throttled, we should get both
messages")
assertEquals(1, fetch.find(_._1 ==
topicIdPartition2).get._2.info.records.batches.asScala.size,
@@ -121,7 +121,7 @@ class ReplicaManagerQuotasTest {
.thenReturn(true)
val fetchParams = PartitionTest.followerFetchParams(followerReplicaId)
- val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota,
readFromPurgatory = false)
+ val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota,
readFromPurgatory = false)
assertEquals(1, fetch.find(_._1 ==
topicIdPartition1).get._2.info.records.batches.asScala.size,
"Given two partitions, with only one throttled, we should get the first")
@@ -137,7 +137,7 @@ class ReplicaManagerQuotasTest {
when(quota.isQuotaExceeded).thenReturn(true)
val fetchParams = PartitionTest.consumerFetchParams()
- val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota,
readFromPurgatory = false).toMap
+ val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota,
readFromPurgatory = false).toMap
assertEquals(1, fetch(topicIdPartition1).info.records.batches.asScala.size,
"Replication throttled partitions should return data for consumer fetch")
assertEquals(1, fetch(topicIdPartition2).info.records.batches.asScala.size,
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java
index 48c16b9d57e..b5c1118dea1 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java
@@ -28,6 +28,7 @@ public class FetchDataInfo {
public final Records records;
public final boolean firstEntryIncomplete;
public final Optional<List<FetchResponseData.AbortedTransaction>>
abortedTransactions;
+ public final Optional<RemoteStorageFetchInfo> delayedRemoteStorageFetch;
public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
Records records) {
@@ -38,10 +39,19 @@ public class FetchDataInfo {
Records records,
boolean firstEntryIncomplete,
Optional<List<FetchResponseData.AbortedTransaction>>
abortedTransactions) {
+ this(fetchOffsetMetadata, records, firstEntryIncomplete,
abortedTransactions, Optional.empty());
+ }
+
+ public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+ Records records,
+ boolean firstEntryIncomplete,
+ Optional<List<FetchResponseData.AbortedTransaction>>
abortedTransactions,
+ Optional<RemoteStorageFetchInfo>
delayedRemoteStorageFetch) {
this.fetchOffsetMetadata = fetchOffsetMetadata;
this.records = records;
this.firstEntryIncomplete = firstEntryIncomplete;
this.abortedTransactions = abortedTransactions;
+ this.delayedRemoteStorageFetch = delayedRemoteStorageFetch;
}
public static FetchDataInfo empty(long fetchOffset) {
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteLogReadResult.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteLogReadResult.java
new file mode 100644
index 00000000000..06c72ecb80c
--- /dev/null
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteLogReadResult.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.Optional;
+
+public class RemoteLogReadResult {
+ public final Optional<FetchDataInfo> fetchDataInfo;
+ public final Optional<Throwable> error;
+
+ public RemoteLogReadResult(Optional<FetchDataInfo> fetchDataInfo,
Optional<Throwable> error) {
+ this.fetchDataInfo = fetchDataInfo;
+ this.error = error;
+ }
+}
+
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
new file mode 100644
index 00000000000..7e8752d703a
--- /dev/null
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest;
+
+public class RemoteStorageFetchInfo {
+
+ public final int fetchMaxBytes;
+ public final boolean minOneMessage;
+ public final TopicPartition topicPartition;
+ public final FetchRequest.PartitionData fetchInfo;
+ public final FetchIsolation fetchIsolation;
+ public final boolean hardMaxBytesLimit;
+
+ public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage,
TopicPartition topicPartition,
+ FetchRequest.PartitionData fetchInfo,
FetchIsolation fetchIsolation,
+ boolean hardMaxBytesLimit) {
+ this.fetchMaxBytes = fetchMaxBytes;
+ this.minOneMessage = minOneMessage;
+ this.topicPartition = topicPartition;
+ this.fetchInfo = fetchInfo;
+ this.fetchIsolation = fetchIsolation;
+ this.hardMaxBytesLimit = hardMaxBytesLimit;
+ }
+
+ @Override
+ public String toString() {
+ return "RemoteStorageFetchInfo{" +
+ "fetchMaxBytes=" + fetchMaxBytes +
+ ", minOneMessage=" + minOneMessage +
+ ", topicPartition=" + topicPartition +
+ ", fetchInfo=" + fetchInfo +
+ ", fetchIsolation=" + fetchIsolation +
+ ", hardMaxBytesLimit=" + hardMaxBytesLimit +
+ '}';
+ }
+}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
new file mode 100644
index 00000000000..2b7ae15b154
--- /dev/null
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RemoteStorageThreadPool extends ThreadPoolExecutor {
+ private final Logger logger;
+
+ public RemoteStorageThreadPool(String threadNamePrefix,
+ int numThreads,
+ int maxPendingTasks) {
+ super(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<>(maxPendingTasks),
+ new RemoteStorageThreadFactory(threadNamePrefix));
+ logger = new LogContext() {
+ @Override
+ public String logPrefix() {
+ return "[" + Thread.currentThread().getName() + "]";
+ }
+ }.logger(RemoteStorageThreadPool.class);
+ }
+
+ @Override
+ protected void afterExecute(Runnable runnable, Throwable th) {
+ if (th != null) {
+ if (th instanceof FatalExitError) {
+ logger.error("Stopping the server as it encountered a fatal
error.");
+ Exit.exit(((FatalExitError) th).statusCode());
+ } else {
+ if (!isShutdown())
+ logger.error("Error occurred while executing task: {}",
runnable, th);
+ }
+ }
+ }
+
+ private static class RemoteStorageThreadFactory implements ThreadFactory {
+ private final String namePrefix;
+ private final AtomicInteger threadNumber = new AtomicInteger(0);
+
+ RemoteStorageThreadFactory(String namePrefix) {
+ this.namePrefix = namePrefix;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, namePrefix + threadNumber.getAndIncrement());
+ }
+
+ }
+}