junrao commented on a change in pull request #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r804020896
########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -90,18 +90,42 @@ abstract class AbstractFetcherThread(name: String, protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] - protected def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] + protected def fetchEpochEndOffsetsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] - protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long + /** + * Fetch the leader's local-log-start-offset and the respective leader-epoch tuple from leader replica for the given + * topic partition and the current leader-epoch of leader in this replica. + * + * @param topicPartition Review comment: Should we complete the javadoc? Ditto for fetchEarliestOffsetFromLeader(). ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -396,7 +421,12 @@ abstract class AbstractFetcherThread(name: String, case Errors.OFFSET_OUT_OF_RANGE => if (handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch)) partitionsWithError += topicPartition - + case Errors.OFFSET_MOVED_TO_TIERED_STORAGE => + debug(s"Received error related to offsets out of rage or moved to tiered storage, error code: ${partitionData.errorCode()}" Review comment: Is "related to offsets out of rage" true? Also, since we explained this error, do we need the error code? ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -715,6 +738,87 @@ abstract class AbstractFetcherThread(name: String, } } + /** + * Handle a partition whose offset is out of range and return a new fetch offset. + */ + private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = { + fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, currentLeaderEpoch, + (_, leaderLogStartOffset) => { + truncateFullyAndStartAt(topicPartition, leaderLogStartOffset) + leaderLogStartOffset + }, + // In this case, it will fetch from leader's log-start-offset like earlier instead of fetching from + // local-log-start-offset. This handles both the scenarios of whether tiered storage is enabled or not. + // If tiered storage is enabled, the next fetch result of fetching from log-start-offset may result in + // OffsetMovedToTieredStorage error and it will handle building the remote log state. + fetchFromLocalLogStartOffset = false) + } + + /** + * Handle the out of range error. Return false if + * 1) the request succeeded or + * 2) was fenced and this thread haven't received new epoch, + * which means we need not backoff and retry. True if there was a retriable error. + */ + private def handleOutOfRangeError(topicPartition: TopicPartition, + fetchState: PartitionFetchState, + requestEpoch: Optional[Integer]): Boolean = { + try { + val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch) + partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) + info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " + + s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}") + false + } catch { + case _: FencedLeaderEpochException => + onPartitionFenced(topicPartition, requestEpoch) + + case e@(_: UnknownTopicOrPartitionException | + _: UnknownLeaderEpochException | + _: NotLeaderOrFollowerException) => + info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}") + true + + case e: Throwable => + error(s"Error getting offset for partition $topicPartition", e) + true + } + } + + /** + * Handle the offset out of range error or offset moved to tiered storage error. Review comment: Is "Handle the offset out of range error" true? ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -235,22 +239,29 @@ class ReplicaFetcherThread(name: String, } } - override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { - fetchOffsetFromLeader(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_TIMESTAMP) + override protected def fetchEarliestLocalOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = { + if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_2_IV0) + fetchOffsetFromLeader(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) + else + fetchOffsetFromLeader(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_TIMESTAMP) + } + + override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = { + fetchOffsetFromLeader(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_TIMESTAMP) } - override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = { fetchOffsetFromLeader(topicPartition, currentLeaderEpoch, ListOffsetsRequest.LATEST_TIMESTAMP) } - private def fetchOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int, earliestOrLatest: Long): Long = { + private def fetchOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int, earliestOrLatest: Long): (Int, Long) = { Review comment: Could we change earliestOrLatest to sth more generic now that it can have 3 values? ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -259,12 +270,12 @@ class ReplicaFetcherThread(name: String, val responsePartition = response.topics.asScala.find(_.name == topicPartition.topic).get .partitions.asScala.find(_.partitionIndex == topicPartition.partition).get - Errors.forCode(responsePartition.errorCode) match { + Errors.forCode(responsePartition.errorCode) match { case Errors.NONE => if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) - responsePartition.offset + (responsePartition.leaderEpoch, responsePartition.offset ) else - responsePartition.oldStyleOffsets.get(0) + (responsePartition.leaderEpoch, responsePartition.oldStyleOffsets.get(0)) Review comment: There is no leader epoch before 0.10. So the leader epoch can just be -1. ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -386,11 +397,147 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + /** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. + */ + override protected def buildRemoteLogAuxState(partition: TopicPartition, + currentLeaderEpoch: Int, + leaderLocalLogStartOffset: Long, + epochForLeaderLocalLogStartOffset: Int, + leaderLogStartOffset: Long): Long = { + + def fetchEarlierEpochEndOffset(epoch: Int): EpochEndOffset = { + val previousEpoch = epoch - 1 + // Find the end-offset for the epoch earlier to the given epoch from the leader + val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition()) + .setCurrentLeaderEpoch(currentLeaderEpoch) + .setLeaderEpoch(previousEpoch)) + val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition) + if (maybeEpochEndOffset.isEmpty) { + throw new KafkaException("No response received for partition: " + partition); + } + + val epochEndOffset = maybeEpochEndOffset.get + if (epochEndOffset.errorCode() != Errors.NONE.code()) { + throw Errors.forCode(epochEndOffset.errorCode()).exception() + } + + epochEndOffset + } + + val log = replicaMgr.localLogOrException(partition) + val nextOffset = { + if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) { + if (replicaMgr.remoteLogManager.isEmpty) throw new IllegalStateException("RemoteLogManager is not yet instantiated") + + val rlm = replicaMgr.remoteLogManager.get + + // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache + // until that offset + val previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1 + val targetEpoch: Int = { + // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) + // will have the same epoch. + if (epochForLeaderLocalLogStartOffset == 0) { + epochForLeaderLocalLogStartOffset + } else { + // Fetch the earlier epoch/end-offset(exclusive) from the leader. + val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset) + // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive. + if (earlierEpochEndOffset.endOffset > previousOffsetToLeaderLocalLogStartOffset) { + // Always use the leader epoch from returned earlierEpochEndOffset. + // This gives the respective leader epoch, that will handle any gaps in epochs. + // For ex, leader epoch cache contains: + // leader-epoch start-offset + // 0 20 + // 1 85 + // <2> - gap no messages were appended in this leader epoch. + // 3 90 + // 4 98 + // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3. + // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90. + // So, for offset 89, we should return leader epoch as 1 like below. + earlierEpochEndOffset.leaderEpoch() + } else epochForLeaderLocalLogStartOffset + } + } + + val maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset) + + if (maybeRlsm.isPresent) { + val remoteLogSegmentMetadata = maybeRlsm.get() + // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start + // segments from (remoteLogSegmentMetadata.endOffset() + 1) + val nextOffset = remoteLogSegmentMetadata.endOffset() + 1 + val epochStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH) + val epochs = readLeaderEpochCheckpoint(epochStream) + + // Truncate the existing local log before restoring the leader epoch cache and producer snapshots. + truncateFullyAndStartAt(partition, nextOffset) + + log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented) + log.leaderEpochCache.foreach { cache => + epochs.foreach(epochEntry => + // Do not flush to file for each entry. + cache.assign(epochEntry.epoch, epochEntry.startOffset, flushToFile = false) + ) + // Flush the cache to the file. + cache.flush() + } + + debug(s"Updated the epoch cache from remote tier till offset: $leaderLocalLogStartOffset " + + s"with size: ${epochs.size} for $partition") + + // Restore producer snapshot + val snapshotFile = UnifiedLog.producerSnapshotFile(log.dir, nextOffset) + val tmpSnapshotFile = new File(snapshotFile.getAbsolutePath + ".tmp"); + // Copy it to snapshot file in atomic manner. + Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT), + tmpSnapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING) + Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath, snapshotFile.toPath, false) + + // Reload producer snapshots. + log.producerStateManager.reloadSnapshots() + log.loadProducerState(nextOffset, reloadFromCleanShutdown = false) + debug(s"Built the leader epoch cache and producer snapshots from remote tier for $partition. " + + s"Active producers: ${log.producerStateManager.activeProducers.size}, " + + s"LeaderLogStartOffset: $leaderLogStartOffset, endOffset: $nextOffset") + + // Return the offset from which next fetch should happen. + nextOffset + } else { + throw new RemoteStorageException(s"Couldn't build the state from remote store for partition: $partition, " + + s"currentLeaderEpoch: $currentLeaderEpoch, leaderLocalLogStartOffset: $leaderLocalLogStartOffset, " + + s"leaderLogStartOffset: $leaderLogStartOffset, epoch: $targetEpoch as the previous remote log segment " + + s"metadata was not found") + } + + } else { Review comment: If we get here, it means the leader has started tiering the data but the follower hasn't received the remoteStorageEnable config yet. It seems that we should backoff and retry the same offset instead of just fetching from leaderLocalLogStartOffset? ########## File path: core/src/main/scala/kafka/api/ApiVersion.scala ########## @@ -119,7 +119,12 @@ object ApiVersion { // Assume message format version is 3.0 (KIP-724) KAFKA_3_0_IV1, // Adds topic IDs to Fetch requests/responses (KIP-516) - KAFKA_3_1_IV0 + KAFKA_3_1_IV0, + // Introduce ListOffsets V8 that supports listing offsets by earliest local time stamp, Review comment: time stamp => timestamp ########## File path: core/src/main/scala/kafka/server/BrokerServer.scala ########## @@ -437,6 +446,19 @@ class BrokerServer( } } + protected def createRemoteLogManager(config: KafkaConfig): Option[RemoteLogManager] = { + val remoteLogManagerConfig = new RemoteLogManagerConfig(config) + if (remoteLogManagerConfig.enableRemoteStorageSystem()) { + if(config.logDirs.size > 1) { Review comment: space after if ########## File path: core/src/main/scala/kafka/server/BrokerServer.scala ########## @@ -437,6 +446,19 @@ class BrokerServer( } } + protected def createRemoteLogManager(config: KafkaConfig): Option[RemoteLogManager] = { + val remoteLogManagerConfig = new RemoteLogManagerConfig(config) Review comment: The original configs in config may contain implementation specific properties. How do we pass those to RLMM and RLM through RemoteLogManager? ########## File path: core/src/main/scala/kafka/log/remote/RemoteLogManager.scala ########## @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.cluster.Partition +import kafka.metrics.KafkaMetricsGroup +import kafka.server.KafkaConfig +import kafka.server.epoch.LeaderEpochFileCache +import kafka.utils.Logging +import org.apache.kafka.common._ +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset +import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream} +import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils} +import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager +import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{Closeable, InputStream} +import java.security.{AccessController, PrivilegedAction} +import java.util +import java.util.Optional +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import scala.collection.Set +import scala.jdk.CollectionConverters._ + +/** + * This class is responsible for + * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances. + * - receives any leader and follower replica events and partition stop events and act on them + * - also provides APIs to fetch indexes, metadata about remote log segments. + * + * @param rlmConfig + * @param brokerId + * @param logDir + */ +class RemoteLogManager(rlmConfig: RemoteLogManagerConfig, + brokerId: Int, + logDir: String) extends Logging with Closeable with KafkaMetricsGroup { + + // topic ids received on leadership changes + private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]() Review comment: It seems that we never add to topicPartitionIds? ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -698,15 +713,23 @@ abstract class AbstractFetcherThread(name: String, * brokers and producers. * * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset - * and the current leader's log start offset. + * and the current leader's (local-log-start-offset or) log start offset. Review comment: The above long comment doesn't quite fit into remote storage. We could make it more general that covers both cases. If that's too complicated, perhaps have a separate method just for handling remote storage. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -90,18 +90,42 @@ abstract class AbstractFetcherThread(name: String, protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] - protected def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] + protected def fetchEpochEndOffsetsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] - protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long + /** + * Fetch the leader's local-log-start-offset and the respective leader-epoch tuple from leader replica for the given + * topic partition and the current leader-epoch of leader in this replica. + * + * @param topicPartition + * @param currentLeaderEpoch + * @return + */ + protected def fetchEarliestLocalOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) + + /** + * Fetch the leader's log-start-offset and the respective leader-epoch tuple from leader replica for the given topic + * partition and the current leader-epoch of leader in this replica. + * + * @param topicPartition + * @param currentLeaderEpoch + * @return + */ + protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) - protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long + protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) protected val isOffsetForLeaderEpochSupported: Boolean protected val isTruncationOnFetchSupported: Boolean + protected def buildRemoteLogAuxState(partition: TopicPartition, Review comment: Could we add the javadoc for this one? ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -715,6 +738,87 @@ abstract class AbstractFetcherThread(name: String, } } + /** + * Handle a partition whose offset is out of range and return a new fetch offset. + */ + private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = { + fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, currentLeaderEpoch, + (_, leaderLogStartOffset) => { + truncateFullyAndStartAt(topicPartition, leaderLogStartOffset) + leaderLogStartOffset + }, + // In this case, it will fetch from leader's log-start-offset like earlier instead of fetching from + // local-log-start-offset. This handles both the scenarios of whether tiered storage is enabled or not. + // If tiered storage is enabled, the next fetch result of fetching from log-start-offset may result in + // OffsetMovedToTieredStorage error and it will handle building the remote log state. + fetchFromLocalLogStartOffset = false) + } + + /** + * Handle the out of range error. Return false if + * 1) the request succeeded or + * 2) was fenced and this thread haven't received new epoch, + * which means we need not backoff and retry. True if there was a retriable error. + */ + private def handleOutOfRangeError(topicPartition: TopicPartition, + fetchState: PartitionFetchState, + requestEpoch: Optional[Integer]): Boolean = { + try { + val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch) + partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) + info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " + + s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}") + false + } catch { + case _: FencedLeaderEpochException => + onPartitionFenced(topicPartition, requestEpoch) + + case e@(_: UnknownTopicOrPartitionException | + _: UnknownLeaderEpochException | + _: NotLeaderOrFollowerException) => + info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}") + true + + case e: Throwable => + error(s"Error getting offset for partition $topicPartition", e) + true + } + } + + /** + * Handle the offset out of range error or offset moved to tiered storage error. + * + * Return false if + * 1) it is able to build the required remote log auxiliary state or + * 2) was fenced and this thread haven't received new epoch, + * which means we need not backoff and retry. True if there was a retriable error. Review comment: Could we make it clear that the reason that we don't need to backoff and retry is because we move the partition to a failed state? Also, could we put true in a separate line? Ditto for handleOutOfRangeError(). ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -386,11 +397,147 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + /** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. + */ + override protected def buildRemoteLogAuxState(partition: TopicPartition, + currentLeaderEpoch: Int, + leaderLocalLogStartOffset: Long, + epochForLeaderLocalLogStartOffset: Int, + leaderLogStartOffset: Long): Long = { + + def fetchEarlierEpochEndOffset(epoch: Int): EpochEndOffset = { + val previousEpoch = epoch - 1 + // Find the end-offset for the epoch earlier to the given epoch from the leader + val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition()) + .setCurrentLeaderEpoch(currentLeaderEpoch) + .setLeaderEpoch(previousEpoch)) + val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition) + if (maybeEpochEndOffset.isEmpty) { + throw new KafkaException("No response received for partition: " + partition); + } + + val epochEndOffset = maybeEpochEndOffset.get + if (epochEndOffset.errorCode() != Errors.NONE.code()) { + throw Errors.forCode(epochEndOffset.errorCode()).exception() + } + + epochEndOffset + } + + val log = replicaMgr.localLogOrException(partition) + val nextOffset = { + if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) { + if (replicaMgr.remoteLogManager.isEmpty) throw new IllegalStateException("RemoteLogManager is not yet instantiated") + + val rlm = replicaMgr.remoteLogManager.get + + // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache + // until that offset + val previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1 + val targetEpoch: Int = { + // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) + // will have the same epoch. + if (epochForLeaderLocalLogStartOffset == 0) { + epochForLeaderLocalLogStartOffset + } else { + // Fetch the earlier epoch/end-offset(exclusive) from the leader. + val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset) + // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive. + if (earlierEpochEndOffset.endOffset > previousOffsetToLeaderLocalLogStartOffset) { + // Always use the leader epoch from returned earlierEpochEndOffset. + // This gives the respective leader epoch, that will handle any gaps in epochs. + // For ex, leader epoch cache contains: + // leader-epoch start-offset + // 0 20 + // 1 85 + // <2> - gap no messages were appended in this leader epoch. + // 3 90 + // 4 98 + // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3. + // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90. + // So, for offset 89, we should return leader epoch as 1 like below. + earlierEpochEndOffset.leaderEpoch() + } else epochForLeaderLocalLogStartOffset + } + } + + val maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset) + + if (maybeRlsm.isPresent) { + val remoteLogSegmentMetadata = maybeRlsm.get() + // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start + // segments from (remoteLogSegmentMetadata.endOffset() + 1) + val nextOffset = remoteLogSegmentMetadata.endOffset() + 1 + val epochStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH) + val epochs = readLeaderEpochCheckpoint(epochStream) + + // Truncate the existing local log before restoring the leader epoch cache and producer snapshots. + truncateFullyAndStartAt(partition, nextOffset) + + log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented) + log.leaderEpochCache.foreach { cache => + epochs.foreach(epochEntry => + // Do not flush to file for each entry. + cache.assign(epochEntry.epoch, epochEntry.startOffset, flushToFile = false) + ) + // Flush the cache to the file. + cache.flush() + } + + debug(s"Updated the epoch cache from remote tier till offset: $leaderLocalLogStartOffset " + + s"with size: ${epochs.size} for $partition") + + // Restore producer snapshot + val snapshotFile = UnifiedLog.producerSnapshotFile(log.dir, nextOffset) + val tmpSnapshotFile = new File(snapshotFile.getAbsolutePath + ".tmp"); + // Copy it to snapshot file in atomic manner. + Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT), + tmpSnapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING) + Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath, snapshotFile.toPath, false) + + // Reload producer snapshots. + log.producerStateManager.reloadSnapshots() + log.loadProducerState(nextOffset, reloadFromCleanShutdown = false) Review comment: This call writes another snapshot, which is unnecessary. Perhaps we could just do producerStateManager.truncateAndReload()? ########## File path: core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala ########## @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.log._ +import kafka.log.remote.RemoteIndexCache.DirName +import kafka.utils.{CoreUtils, Logging, ShutdownableThread} +import org.apache.kafka.common.errors.CorruptRecordException +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType +import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{File, InputStream} +import java.nio.file.{Files, Path} +import java.util +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +object RemoteIndexCache { + val DirName = "remote-log-index-cache" + val TmpFileSuffix = ".tmp" +} + +class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) { + private val markedForCleanup = new AtomicBoolean(false) + + def lookupOffset(targetOffset: Long): OffsetPosition = { + if (markedForCleanup.get()) throw new IllegalStateException("This entry is marked for cleanup") + else offsetIndex.lookup(targetOffset) + } + + def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = { + if (markedForCleanup.get()) throw new IllegalStateException("This entry is marked for cleanup") + + val timestampOffset = timeIndex.lookup(timestamp) + offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset)) + } + + def markForCleanup(): Unit = { + if (!markedForCleanup.getAndSet(true)) { + Array(offsetIndex, timeIndex, txnIndex).foreach(x => + x.renameTo(new File(CoreUtils.replaceSuffix(x.file.getPath, "", UnifiedLog.DeletedFileSuffix)))) + } + } + + def cleanup(): Unit = { + markForCleanup() + CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists())) Review comment: Should we close the indexes too? ########## File path: core/src/main/scala/kafka/log/remote/RemoteLogManager.scala ########## @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.cluster.Partition +import kafka.metrics.KafkaMetricsGroup +import kafka.server.KafkaConfig +import kafka.server.epoch.LeaderEpochFileCache +import kafka.utils.Logging +import org.apache.kafka.common._ +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset +import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream} +import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils} +import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager +import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{Closeable, InputStream} +import java.security.{AccessController, PrivilegedAction} +import java.util +import java.util.Optional +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import scala.collection.Set +import scala.jdk.CollectionConverters._ + +/** + * This class is responsible for + * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances. + * - receives any leader and follower replica events and partition stop events and act on them + * - also provides APIs to fetch indexes, metadata about remote log segments. + * + * @param rlmConfig + * @param brokerId + * @param logDir + */ +class RemoteLogManager(rlmConfig: RemoteLogManagerConfig, + brokerId: Int, + logDir: String) extends Logging with Closeable with KafkaMetricsGroup { + + // topic ids received on leadership changes + private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]() + + private val remoteLogStorageManager: RemoteStorageManager = createRemoteStorageManager() + private val remoteLogMetadataManager: RemoteLogMetadataManager = createRemoteLogMetadataManager() + + private val indexCache = new RemoteIndexCache(remoteStorageManager = remoteLogStorageManager, logDir = logDir) + + private var closed = false + + private[remote] def createRemoteStorageManager(): RemoteStorageManager = { + def createDelegate(classLoader: ClassLoader): RemoteStorageManager = { + classLoader.loadClass(rlmConfig.remoteStorageManagerClassName()) + .getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager] + } + + AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] { + private val classPath = rlmConfig.remoteStorageManagerClassPath() + + override def run(): RemoteStorageManager = { + if (classPath != null && classPath.trim.nonEmpty) { + val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader) + val delegate = createDelegate(classLoader) + new ClassLoaderAwareRemoteStorageManager(delegate, classLoader) + } else { + createDelegate(this.getClass.getClassLoader) + } + } + }) + } + + private def configureRSM(): Unit = { + val rsmProps = new util.HashMap[String, Any]() + rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => rsmProps.put(k, v) } + rsmProps.put(KafkaConfig.BrokerIdProp, brokerId) + remoteLogStorageManager.configure(rsmProps) + } + + private[remote] def createRemoteLogMetadataManager(): RemoteLogMetadataManager = { + def createDelegate(classLoader: ClassLoader) = { + classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName()) + .getDeclaredConstructor() + .newInstance() + .asInstanceOf[RemoteLogMetadataManager] + } + + AccessController.doPrivileged(new PrivilegedAction[RemoteLogMetadataManager] { + private val classPath = rlmConfig.remoteLogMetadataManagerClassPath + + override def run(): RemoteLogMetadataManager = { + if (classPath != null && classPath.trim.nonEmpty) { + val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader) + val delegate = createDelegate(classLoader) + new ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader) + } else { + createDelegate(this.getClass.getClassLoader) + } + } + }) + } + + private def configureRLMM(): Unit = { + val rlmmProps = new util.HashMap[String, Any]() + rlmConfig.remoteLogMetadataManagerProps().asScala.foreach { case (k, v) => rlmmProps.put(k, v) } + rlmmProps.put(KafkaConfig.BrokerIdProp, brokerId) + rlmmProps.put(KafkaConfig.LogDirProp, logDir) + remoteLogMetadataManager.configure(rlmmProps) + } + + def startup(): Unit = { + // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources + // in connecting to the brokers or remote storages. + configureRSM() + configureRLMM() + } + + def storageManager(): RemoteStorageManager = { + remoteLogStorageManager + } + + /** + * Callback to receive any leadership changes for the topic partitions assigned to this broker. If there are no + * existing tasks for a given topic partition then it will assign new leader or follower task else it will convert the + * task to respective target state(leader or follower). + * + * @param partitionsBecomeLeader partitions that have become leaders on this broker. + * @param partitionsBecomeFollower partitions that have become followers on this broker. + * @param topicIds topic name to topic id mappings. + */ + def onLeadershipChange(partitionsBecomeLeader: Set[Partition], + partitionsBecomeFollower: Set[Partition], + topicIds: util.Map[String, Uuid]): Unit = { + debug(s"Received leadership changes for leaders: $partitionsBecomeLeader and followers: $partitionsBecomeFollower") + + // Partitions logs are available when this callback is invoked. + // Compact topics and internal topics are filtered here as they are not supported with tiered storage. + def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = { + // We are not specifically checking for internal topics etc here as `log.remoteLogEnabled()` already handles that. + partitions.filter(partition => partition.log.exists(log => log.remoteLogEnabled())) + .map(partition => new TopicIdPartition(topicIds.get(partition.topic), partition.topicPartition)) + } + + val followerTopicPartitions = filterPartitions(partitionsBecomeFollower) + val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader) + debug(s"Effective topic partitions after filtering compact and internal topics, leaders: $leaderTopicPartitions " + + s"and followers: $followerTopicPartitions") + + if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) { + remoteLogMetadataManager.onPartitionLeadershipChanges(leaderTopicPartitions.asJava, followerTopicPartitions.asJava) + } + } + + /** + * Stops partitions for copying segments, building indexes and deletes the partition in remote storage if delete flag + * is set as true. + * + * @param topicPartition topic partition to be stopped. + * @param delete flag to indicate whether the given topic partitions to be deleted or not. + */ + def stopPartitions(topicPartition: TopicPartition, delete: Boolean): Unit = { + if (delete) { + // Delete from internal datastructures only if it is to be deleted. + val topicIdPartition = topicPartitionIds.remove(topicPartition) + debug(s"Removed partition: $topicIdPartition from topicPartitionIds") + } + } + + def fetchRemoteLogSegmentMetadata(topicPartition: TopicPartition, + epochForOffset: Int, + offset: Long): Optional[RemoteLogSegmentMetadata] = { + val topicId = topicPartitionIds.get(topicPartition) + + if (topicId == null) { + throw new KafkaException("No topic id registered for topic partition: " + topicPartition) + } + + remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset) + } + + private def lookupTimestamp(rlsMetadata: RemoteLogSegmentMetadata, timestamp: Long, startingOffset: Long): Option[TimestampAndOffset] = { + val startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset) + + var remoteSegInputStream: InputStream = null + try { + // Search forward for the position of the last offset that is greater than or equal to the target offset + remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata, startPos) + val remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream) + var batch: RecordBatch = null + + def nextBatch(): RecordBatch = { + batch = remoteLogInputStream.nextBatch() + batch + } + + while (nextBatch() != null) { + if (batch.maxTimestamp >= timestamp && batch.lastOffset >= startingOffset) { + batch.iterator.asScala.foreach(record => { + if (record.timestamp >= timestamp && record.offset >= startingOffset) + return Some(new TimestampAndOffset(record.timestamp, record.offset, maybeLeaderEpoch(batch.partitionLeaderEpoch))) + }) + } + } + None + } finally { + Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream") + } + } + + private def maybeLeaderEpoch(leaderEpoch: Int): Optional[Integer] = { + if (leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH) + Optional.empty() + else + Optional.of(leaderEpoch) + } + + /** + * Search the message offset in the remote storage based on timestamp and offset. + * + * This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules: + * + * - If there is no messages in the remote storage, return None + * - If all the messages in the remote storage have smaller offsets, return None + * - If all the messages in the remote storage have smaller timestamps, return None + * - If all the messages in the remote storage have larger timestamps, or no message in the remote storage has a timestamp Review comment: In the local case, if all the messages in the remote storage have larger timestamps, it seems that we return the timestamp of the first message. ########## File path: clients/src/main/resources/common/message/ListOffsetsResponse.json ########## @@ -31,7 +31,10 @@ // Version 6 enables flexible versions. // // Version 7 is the same as version 6 (KIP-734). - "validVersions": "0-7", + // + // Version 8 enables listing offsets by local timestamp. Review comment: It's not really local timestamp. ########## File path: core/src/main/scala/kafka/log/CleanableIndex.scala ########## @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io.{Closeable, File} +import java.nio.file.Path + +import org.apache.kafka.common.utils.Utils + +/** + * This class represents a common abstraction for operations like delete and rename of the index files. + * + * @param _file index file + * @see [[AbstractIndex]], [[OffsetIndex]], [[TimeIndex]], [[TransactionIndex]] + */ +abstract class CleanableIndex(@volatile var _file: File) extends Closeable { + + /** + * Rename the file that backs this offset index Review comment: This is not just an offset index. ########## File path: core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala ########## @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.log._ +import kafka.log.remote.RemoteIndexCache.DirName +import kafka.utils.{CoreUtils, Logging, ShutdownableThread} +import org.apache.kafka.common.errors.CorruptRecordException +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType +import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{File, InputStream} +import java.nio.file.{Files, Path} +import java.util +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +object RemoteIndexCache { + val DirName = "remote-log-index-cache" + val TmpFileSuffix = ".tmp" +} + +class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) { + private val markedForCleanup = new AtomicBoolean(false) + + def lookupOffset(targetOffset: Long): OffsetPosition = { + if (markedForCleanup.get()) throw new IllegalStateException("This entry is marked for cleanup") Review comment: Is this safe? The entry could be cleaned immediately after this check. ########## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ########## @@ -159,22 +161,39 @@ class ReplicaAlterLogDirsThread(name: String, } } - override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = { + override protected def fetchEarliestLocalOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { Review comment: This is not supported yet, right? Should we throw an exception? ########## File path: core/src/main/scala/kafka/log/CleanableIndex.scala ########## @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io.{Closeable, File} +import java.nio.file.Path + +import org.apache.kafka.common.utils.Utils + +/** + * This class represents a common abstraction for operations like delete and rename of the index files. + * + * @param _file index file + * @see [[AbstractIndex]], [[OffsetIndex]], [[TimeIndex]], [[TransactionIndex]] + */ +abstract class CleanableIndex(@volatile var _file: File) extends Closeable { Review comment: Why is this called CleanableIndex? It's bit confusing given log cleaner. Maybe sth like BaseIndex? ########## File path: clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC; +import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; +import static org.apache.kafka.common.record.Records.MAGIC_OFFSET; +import static org.apache.kafka.common.record.Records.SIZE_OFFSET; + +public class RemoteLogInputStream implements LogInputStream<RecordBatch> { + private final InputStream inputStream; + // LogHeader buffer up to magic. + private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC); + + public RemoteLogInputStream(InputStream inputStream) { + this.inputStream = inputStream; + } + + @Override + public RecordBatch nextBatch() throws IOException { + logHeaderBuffer.rewind(); Review comment: rewind() is typically used after the buffer is written. Should we use clear()? ########## File path: core/src/main/scala/kafka/log/UnifiedLog.scala ########## @@ -550,6 +567,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, explicitMetricName(pkgStr, "Log", name, tags) } + def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized { Review comment: No need to pass in reloadFromCleanShutdown since it's always false? ########## File path: core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala ########## @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.log._ +import kafka.log.remote.RemoteIndexCache.DirName +import kafka.utils.{CoreUtils, Logging, ShutdownableThread} +import org.apache.kafka.common.errors.CorruptRecordException +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType +import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{File, InputStream} +import java.nio.file.{Files, Path} +import java.util +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +object RemoteIndexCache { + val DirName = "remote-log-index-cache" + val TmpFileSuffix = ".tmp" +} + +class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) { + private val markedForCleanup = new AtomicBoolean(false) + + def lookupOffset(targetOffset: Long): OffsetPosition = { + if (markedForCleanup.get()) throw new IllegalStateException("This entry is marked for cleanup") + else offsetIndex.lookup(targetOffset) + } + + def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = { + if (markedForCleanup.get()) throw new IllegalStateException("This entry is marked for cleanup") + + val timestampOffset = timeIndex.lookup(timestamp) + offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset)) + } + + def markForCleanup(): Unit = { + if (!markedForCleanup.getAndSet(true)) { + Array(offsetIndex, timeIndex, txnIndex).foreach(x => + x.renameTo(new File(CoreUtils.replaceSuffix(x.file.getPath, "", UnifiedLog.DeletedFileSuffix)))) + } + } + + def cleanup(): Unit = { + markForCleanup() + CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists())) + } + + def close(): Unit = { + Array(offsetIndex, timeIndex, txnIndex).foreach(index => try { + index.close() + } catch { + case _: Exception => // ignore error. + }) + } +} + +/** + * This is a LRU cache of remote index files stored in `$logdir/remote-log-index-cache`. This is helpful to avoid + * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. + * + * @param maxSize + * @param remoteStorageManager + * @param logDir + */ +//todo-tier make maxSize configurable +class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageManager, logDir: String) extends Logging { + + val cacheDir = new File(logDir, DirName) + @volatile var closed = false + + val expiredIndexes = new LinkedBlockingQueue[Entry]() + val lock = new Object() + + val entries: util.Map[RemoteLogSegmentId, Entry] = new java.util.LinkedHashMap[RemoteLogSegmentId, Entry](maxSize / 2, + 0.75f, true) { + override def removeEldestEntry(eldest: util.Map.Entry[RemoteLogSegmentId, Entry]): Boolean = { + if (this.size() > maxSize) { + val entry = eldest.getValue + // Mark the entries for cleanup, background thread will clean them later. + entry.markForCleanup() + expiredIndexes.add(entry) + true + } else { + false + } + } + } + + private def init(): Unit = { + if (cacheDir.mkdir()) + info(s"Created $cacheDir successfully") + + // Delete any .deleted files remained from the earlier run of the broker. + Files.list(cacheDir.toPath).forEach((path: Path) => { + if (path.endsWith(UnifiedLog.DeletedFileSuffix)) { + Files.deleteIfExists(path) + } + }) + } + + init() Review comment: Should we load up existing files to entries during init()? Otherwise, it's not clear when they will be cleaned up. ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +public class ClassLoaderAwareRemoteLogMetadataManager implements RemoteLogMetadataManager { + private final RemoteLogMetadataManager delegate; + private final ClassLoader loader; + + public ClassLoaderAwareRemoteLogMetadataManager(RemoteLogMetadataManager delegate, + ClassLoader loader) { + this.delegate = delegate; + this.loader = loader; + } + + @Override + public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { + return withClassLoader(() -> delegate.addRemoteLogSegmentMetadata(remoteLogSegmentMetadata)); + } + + @Override + public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException { + return withClassLoader(() -> delegate.updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate)); + } + + @Override + public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition, + int epochForOffset, + long offset) throws RemoteStorageException { + return withClassLoader(() -> delegate.remoteLogSegmentMetadata(topicIdPartition, epochForOffset, offset)); + } + + @Override + public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition, + int leaderEpoch) throws RemoteStorageException { + return withClassLoader(() -> delegate.highestOffsetForEpoch(topicIdPartition, leaderEpoch)); + } + + @Override + public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException { + return withClassLoader(() -> delegate.putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata) + ); + } + + @Override + public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) throws RemoteStorageException { + return withClassLoader(() -> delegate.listRemoteLogSegments(topicIdPartition)); + } + + @Override + public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, + int leaderEpoch) throws RemoteStorageException { + return withClassLoader(() -> delegate.listRemoteLogSegments(topicIdPartition, leaderEpoch)); + } + + @Override + public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions, + Set<TopicIdPartition> followerPartitions) { + withTryCatchClassLoader(() -> { + delegate.onPartitionLeadershipChanges(leaderPartitions, followerPartitions); + return null; + }); + } + + @Override + public void onStopPartitions(Set<TopicIdPartition> partitions) { + withTryCatchClassLoader(() -> { + delegate.onStopPartitions(partitions); + return null; + }); + } + + @Override + public void configure(Map<String, ?> configs) { + withTryCatchClassLoader(() -> { + delegate.configure(configs); + return null; + }); + } + + @Override + public void close() throws IOException { + ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(loader); + try { + delegate.close(); + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } + + @SuppressWarnings("UnusedReturnValue") + private <T> T withTryCatchClassLoader(Worker<T> worker) { + try { + return withClassLoader(worker); + } catch (final RemoteStorageException ex) { + // ignore, this exception is not thrown by the method. + } + return null; + } + + private <T> T withClassLoader(Worker<T> worker) throws RemoteStorageException { + ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(loader); Review comment: Hmm, it's kind of weird to set the class loader on each call. Is this needed? Do other remote storage classes just use the classloader for RLM and RLMM? ########## File path: core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala ########## @@ -169,6 +168,24 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, } } + def findPreviousEpoch(epoch: Int): Option[Int] = { Review comment: previousEpoch? ########## File path: core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala ########## @@ -169,6 +168,24 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, } } + def findPreviousEpoch(epoch: Int): Option[Int] = { + inReadLock(lock) { + Option(epochs.lowerKey(epoch)) + } + } + + def findNextEpoch(epoch: Int): Option[Int] = { + inReadLock(lock) { + Option(epochs.higherKey(epoch)) + } + } + Review comment: nextEpoch? ########## File path: core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala ########## @@ -268,6 +285,28 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, } } + def epochForOffset(offset: Long): Option[Int] = { + inReadLock(lock) { + var previousEpoch = earliestEntry.map(_.epoch) Review comment: Should previousEpoch be initialize to None? ########## File path: core/src/main/scala/kafka/log/remote/RemoteLogManager.scala ########## @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.cluster.Partition +import kafka.metrics.KafkaMetricsGroup +import kafka.server.KafkaConfig +import kafka.server.epoch.LeaderEpochFileCache +import kafka.utils.Logging +import org.apache.kafka.common._ +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset +import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream} +import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils} +import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager +import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{Closeable, InputStream} +import java.security.{AccessController, PrivilegedAction} +import java.util +import java.util.Optional +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import scala.collection.Set +import scala.jdk.CollectionConverters._ + +/** + * This class is responsible for + * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances. + * - receives any leader and follower replica events and partition stop events and act on them + * - also provides APIs to fetch indexes, metadata about remote log segments. + * + * @param rlmConfig + * @param brokerId + * @param logDir + */ +class RemoteLogManager(rlmConfig: RemoteLogManagerConfig, + brokerId: Int, + logDir: String) extends Logging with Closeable with KafkaMetricsGroup { + + // topic ids received on leadership changes + private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]() + + private val remoteLogStorageManager: RemoteStorageManager = createRemoteStorageManager() + private val remoteLogMetadataManager: RemoteLogMetadataManager = createRemoteLogMetadataManager() + + private val indexCache = new RemoteIndexCache(remoteStorageManager = remoteLogStorageManager, logDir = logDir) + + private var closed = false + + private[remote] def createRemoteStorageManager(): RemoteStorageManager = { + def createDelegate(classLoader: ClassLoader): RemoteStorageManager = { + classLoader.loadClass(rlmConfig.remoteStorageManagerClassName()) + .getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager] + } + + AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] { + private val classPath = rlmConfig.remoteStorageManagerClassPath() + + override def run(): RemoteStorageManager = { + if (classPath != null && classPath.trim.nonEmpty) { + val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader) + val delegate = createDelegate(classLoader) + new ClassLoaderAwareRemoteStorageManager(delegate, classLoader) + } else { + createDelegate(this.getClass.getClassLoader) + } + } + }) + } + + private def configureRSM(): Unit = { + val rsmProps = new util.HashMap[String, Any]() + rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => rsmProps.put(k, v) } + rsmProps.put(KafkaConfig.BrokerIdProp, brokerId) + remoteLogStorageManager.configure(rsmProps) + } + + private[remote] def createRemoteLogMetadataManager(): RemoteLogMetadataManager = { + def createDelegate(classLoader: ClassLoader) = { + classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName()) + .getDeclaredConstructor() + .newInstance() + .asInstanceOf[RemoteLogMetadataManager] + } + + AccessController.doPrivileged(new PrivilegedAction[RemoteLogMetadataManager] { + private val classPath = rlmConfig.remoteLogMetadataManagerClassPath + + override def run(): RemoteLogMetadataManager = { + if (classPath != null && classPath.trim.nonEmpty) { + val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader) + val delegate = createDelegate(classLoader) + new ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader) + } else { + createDelegate(this.getClass.getClassLoader) + } + } + }) + } + + private def configureRLMM(): Unit = { + val rlmmProps = new util.HashMap[String, Any]() + rlmConfig.remoteLogMetadataManagerProps().asScala.foreach { case (k, v) => rlmmProps.put(k, v) } + rlmmProps.put(KafkaConfig.BrokerIdProp, brokerId) + rlmmProps.put(KafkaConfig.LogDirProp, logDir) + remoteLogMetadataManager.configure(rlmmProps) + } + + def startup(): Unit = { + // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources + // in connecting to the brokers or remote storages. + configureRSM() + configureRLMM() + } + + def storageManager(): RemoteStorageManager = { + remoteLogStorageManager + } + + /** + * Callback to receive any leadership changes for the topic partitions assigned to this broker. If there are no + * existing tasks for a given topic partition then it will assign new leader or follower task else it will convert the + * task to respective target state(leader or follower). + * + * @param partitionsBecomeLeader partitions that have become leaders on this broker. + * @param partitionsBecomeFollower partitions that have become followers on this broker. + * @param topicIds topic name to topic id mappings. + */ + def onLeadershipChange(partitionsBecomeLeader: Set[Partition], + partitionsBecomeFollower: Set[Partition], + topicIds: util.Map[String, Uuid]): Unit = { + debug(s"Received leadership changes for leaders: $partitionsBecomeLeader and followers: $partitionsBecomeFollower") + + // Partitions logs are available when this callback is invoked. + // Compact topics and internal topics are filtered here as they are not supported with tiered storage. + def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = { + // We are not specifically checking for internal topics etc here as `log.remoteLogEnabled()` already handles that. + partitions.filter(partition => partition.log.exists(log => log.remoteLogEnabled())) + .map(partition => new TopicIdPartition(topicIds.get(partition.topic), partition.topicPartition)) + } + + val followerTopicPartitions = filterPartitions(partitionsBecomeFollower) + val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader) + debug(s"Effective topic partitions after filtering compact and internal topics, leaders: $leaderTopicPartitions " + + s"and followers: $followerTopicPartitions") + + if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) { + remoteLogMetadataManager.onPartitionLeadershipChanges(leaderTopicPartitions.asJava, followerTopicPartitions.asJava) + } + } + + /** + * Stops partitions for copying segments, building indexes and deletes the partition in remote storage if delete flag + * is set as true. + * + * @param topicPartition topic partition to be stopped. + * @param delete flag to indicate whether the given topic partitions to be deleted or not. + */ + def stopPartitions(topicPartition: TopicPartition, delete: Boolean): Unit = { + if (delete) { + // Delete from internal datastructures only if it is to be deleted. + val topicIdPartition = topicPartitionIds.remove(topicPartition) + debug(s"Removed partition: $topicIdPartition from topicPartitionIds") + } + } + + def fetchRemoteLogSegmentMetadata(topicPartition: TopicPartition, + epochForOffset: Int, + offset: Long): Optional[RemoteLogSegmentMetadata] = { + val topicId = topicPartitionIds.get(topicPartition) + + if (topicId == null) { + throw new KafkaException("No topic id registered for topic partition: " + topicPartition) + } + + remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset) + } + + private def lookupTimestamp(rlsMetadata: RemoteLogSegmentMetadata, timestamp: Long, startingOffset: Long): Option[TimestampAndOffset] = { + val startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset) + + var remoteSegInputStream: InputStream = null + try { + // Search forward for the position of the last offset that is greater than or equal to the target offset + remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata, startPos) + val remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream) + var batch: RecordBatch = null + + def nextBatch(): RecordBatch = { + batch = remoteLogInputStream.nextBatch() + batch + } + + while (nextBatch() != null) { + if (batch.maxTimestamp >= timestamp && batch.lastOffset >= startingOffset) { + batch.iterator.asScala.foreach(record => { + if (record.timestamp >= timestamp && record.offset >= startingOffset) + return Some(new TimestampAndOffset(record.timestamp, record.offset, maybeLeaderEpoch(batch.partitionLeaderEpoch))) + }) + } + } + None + } finally { + Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream") + } + } + + private def maybeLeaderEpoch(leaderEpoch: Int): Optional[Integer] = { + if (leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH) + Optional.empty() + else + Optional.of(leaderEpoch) + } + + /** + * Search the message offset in the remote storage based on timestamp and offset. + * + * This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules: + * + * - If there is no messages in the remote storage, return None + * - If all the messages in the remote storage have smaller offsets, return None + * - If all the messages in the remote storage have smaller timestamps, return None + * - If all the messages in the remote storage have larger timestamps, or no message in the remote storage has a timestamp + * the returned offset will be max(the earliest offset in the remote storage, startingOffset) and the timestamp will + * be Message.NoTimestamp. + * - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp + * is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset. + * + * @param timestamp The timestamp to search for. + * @param startingOffset The starting offset to search. + * @return the timestamp and offset of the first message that meets the requirements. None will be returned if there is no such message. + */ + def findOffsetByTimestamp(tp: TopicPartition, + timestamp: Long, + startingOffset: Long, + leaderEpochCache: LeaderEpochFileCache): Option[TimestampAndOffset] = { + val topicId = topicPartitionIds.get(tp) + if (topicId == null) { + throw new KafkaException("Topic id does not exist for topic partition: " + tp) + } + // Get the respective epoch in which the starting offset exists. + var maybeEpoch = leaderEpochCache.epochForOffset(startingOffset); + while (maybeEpoch.nonEmpty) { + remoteLogMetadataManager.listRemoteLogSegments(new TopicIdPartition(topicId, tp), maybeEpoch.get).asScala Review comment: Could we call maybeEpoch.get() once in the loop? ########## File path: core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala ########## @@ -169,6 +168,24 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, } } + def findPreviousEpoch(epoch: Int): Option[Int] = { + inReadLock(lock) { + Option(epochs.lowerKey(epoch)) + } + } + + def findNextEpoch(epoch: Int): Option[Int] = { + inReadLock(lock) { + Option(epochs.higherKey(epoch)) + } + } + + def getEpochEntry(epoch: Int): Option[EpochEntry] = { + inReadLock(lock) { + Option.apply(epochs.get(epoch)) + } + } + Review comment: epochEntry? ########## File path: core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala ########## @@ -268,6 +285,28 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, } } + def epochForOffset(offset: Long): Option[Int] = { + inReadLock(lock) { + var previousEpoch = earliestEntry.map(_.epoch) + epochs.values().asScala.foreach { + case EpochEntry(epoch, startOffset) => + if (startOffset == offset) + return Some(epoch) + if (startOffset > offset) + return previousEpoch + previousEpoch = Some(epoch) + } + previousEpoch + } + } + + def writeTo(leaderEpochCheckpoint: LeaderEpochCheckpoint): LeaderEpochFileCache = { Review comment: This seems unused? ########## File path: core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala ########## @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.log._ +import kafka.log.remote.RemoteIndexCache.DirName +import kafka.utils.{CoreUtils, Logging, ShutdownableThread} +import org.apache.kafka.common.errors.CorruptRecordException +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType +import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{File, InputStream} +import java.nio.file.{Files, Path} +import java.util +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +object RemoteIndexCache { + val DirName = "remote-log-index-cache" + val TmpFileSuffix = ".tmp" +} + +class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) { + private val markedForCleanup = new AtomicBoolean(false) + + def lookupOffset(targetOffset: Long): OffsetPosition = { + if (markedForCleanup.get()) throw new IllegalStateException("This entry is marked for cleanup") + else offsetIndex.lookup(targetOffset) + } + + def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = { + if (markedForCleanup.get()) throw new IllegalStateException("This entry is marked for cleanup") + + val timestampOffset = timeIndex.lookup(timestamp) + offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset)) + } + + def markForCleanup(): Unit = { + if (!markedForCleanup.getAndSet(true)) { + Array(offsetIndex, timeIndex, txnIndex).foreach(x => + x.renameTo(new File(CoreUtils.replaceSuffix(x.file.getPath, "", UnifiedLog.DeletedFileSuffix)))) + } + } + + def cleanup(): Unit = { + markForCleanup() + CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists())) + } + + def close(): Unit = { + Array(offsetIndex, timeIndex, txnIndex).foreach(index => try { + index.close() + } catch { + case _: Exception => // ignore error. + }) + } +} + +/** + * This is a LRU cache of remote index files stored in `$logdir/remote-log-index-cache`. This is helpful to avoid + * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. + * + * @param maxSize + * @param remoteStorageManager + * @param logDir + */ +//todo-tier make maxSize configurable +class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageManager, logDir: String) extends Logging { + + val cacheDir = new File(logDir, DirName) + @volatile var closed = false Review comment: This is never read? ########## File path: core/src/main/scala/kafka/log/UnifiedLog.scala ########## @@ -1259,12 +1292,30 @@ class UnifiedLog(@volatile var logStartOffset: Long, latestTimestampAndOffset.offset, epochOptional)) } else { - // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides - // constant time access while being safe to use with concurrent collections unlike `toArray`. - val segmentsCopy = logSegments.toBuffer // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. - val targetSeg = segmentsCopy.find(_.largestTimestamp >= targetTimestamp) - targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset)) + val remoteOffset = if (remoteLogEnabled()) { + if (remoteLogManager.isEmpty) { + throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled."); + } + if (leaderEpochCache.isEmpty) { Review comment: This seems an indirect way to check message version. Should we just check that explicitly? ########## File path: core/src/main/scala/kafka/log/UnifiedLog.scala ########## @@ -1259,12 +1292,30 @@ class UnifiedLog(@volatile var logStartOffset: Long, latestTimestampAndOffset.offset, epochOptional)) } else { - // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides - // constant time access while being safe to use with concurrent collections unlike `toArray`. - val segmentsCopy = logSegments.toBuffer // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. - val targetSeg = segmentsCopy.find(_.largestTimestamp >= targetTimestamp) - targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset)) + val remoteOffset = if (remoteLogEnabled()) { + if (remoteLogManager.isEmpty) { + throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled."); + } + if (leaderEpochCache.isEmpty) { + throw new KafkaException("Tiered storage is supported only with versions supporting leader epochs, that means RecordVersion must be >= 2.") + } + + remoteLogManager.get.findOffsetByTimestamp(topicPartition, targetTimestamp, logStartOffset, leaderEpochCache.get) + } else None + + if (remoteOffset.nonEmpty) { + remoteOffset + } else { + // If it is not found in remote storage, search in the local storage starting with local log start offset. + + // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides + // constant time access while being safe to use with concurrent collections unlike `toArray`. + val segmentsCopy = logSegments.toBuffer + + val targetSeg = segmentsCopy.find(_.largestTimestamp >= targetTimestamp) + targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, localLogStartOffset)) Review comment: Hmm, localLogStartOffset could change after the call the remoteLogManager. ########## File path: clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC; +import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; +import static org.apache.kafka.common.record.Records.MAGIC_OFFSET; +import static org.apache.kafka.common.record.Records.OFFSET_OFFSET; +import static org.apache.kafka.common.record.Records.SIZE_OFFSET; + +public class RemoteLogInputStream implements LogInputStream<RecordBatch> { + private final InputStream is; + private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC); + + public RemoteLogInputStream(InputStream is) { + this.is = is; + } + + @Override + public RecordBatch nextBatch() throws IOException { + logHeaderBuffer.rewind(); + Utils.readFully(is, logHeaderBuffer); + + if (logHeaderBuffer.position() < HEADER_SIZE_UP_TO_MAGIC) + return null; + + logHeaderBuffer.rewind(); + logHeaderBuffer.getLong(OFFSET_OFFSET); + int size = logHeaderBuffer.getInt(SIZE_OFFSET); + + // V0 has the smallest overhead, stricter checking is done later + if (size < LegacyRecord.RECORD_OVERHEAD_V0) + throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " + + "overhead (%d).", size, LegacyRecord.RECORD_OVERHEAD_V0)); + + byte magic = logHeaderBuffer.get(MAGIC_OFFSET); + ByteBuffer buffer = ByteBuffer.allocate(size + LOG_OVERHEAD); + System.arraycopy(logHeaderBuffer.array(), 0, buffer.array(), 0, logHeaderBuffer.limit()); Review comment: > To use array(), we have to first check hasArray(). It's probably simpler to do buffer.put(logHeaderBuffer). Was this comment addressed? ########## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ########## @@ -311,4 +330,13 @@ class ReplicaAlterLogDirsThread(name: String, } } + override protected def buildRemoteLogAuxState(partition: TopicPartition, currentLeaderEpoch: Int, fetchOffset: Long, epochForFetchOffset: Int, leaderLogStartOffset: Long): Long = { + // JBOD is not supported with tiered storage. Review comment: Should we just throw UnsupportedException? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org