This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b2da186f214 KAFKA-15265: Remote fetch throttle metrics (#16087) b2da186f214 is described below commit b2da186f21490ddf9922a7ad834e830deee1f7a8 Author: Abhijeet Kumar <abhijeet.cse....@gmail.com> AuthorDate: Wed Jul 3 14:11:18 2024 +0530 KAFKA-15265: Remote fetch throttle metrics (#16087) As part of [KIP-956](https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas), we have added quota for remote fetches from remote storage. In this PR, we are adding the following metrics for remote fetch throttling. remote-fetch-throttle-time-avg : The average time in millis remote fetches was throttled by a broker remote-fetch-throttle-time-max : The max time in millis remote fetches was throttled by a broker Reviewers: Kamal Chandraprakash<kamal.chandraprak...@gmail.com>, Luke Chen <show...@gmail.com>, Satish Duggana <sati...@apache.org> --- .../java/kafka/log/remote/RemoteLogManager.java | 17 ++++++-- .../kafka/log/remote/quota/RLMQuotaManager.java | 7 ++-- .../kafka/log/remote/quota/RLMQuotaMetrics.java | 47 ++++++++++++++++++++++ .../main/scala/kafka/server/ReplicaManager.scala | 6 ++- .../kafka/log/remote/RemoteLogManagerTest.java | 10 ++--- .../log/remote/quota/RLMQuotaManagerTest.java | 16 ++++---- .../unit/kafka/server/ReplicaManagerTest.scala | 43 +++++++++++++++++--- 7 files changed, 121 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 5aedd1bbfb3..2d589337e71 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -21,6 +21,7 @@ import kafka.cluster.Partition; import kafka.log.UnifiedLog; import kafka.log.remote.quota.RLMQuotaManager; import kafka.log.remote.quota.RLMQuotaManagerConfig; +import kafka.log.remote.quota.RLMQuotaMetrics; import kafka.server.BrokerTopicStats; import kafka.server.QuotaType; import kafka.server.StopPartition; @@ -34,6 +35,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; @@ -137,6 +139,7 @@ import java.util.stream.Stream; import scala.Option; import scala.collection.JavaConverters; +import static kafka.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS; import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC; @@ -171,6 +174,7 @@ public class RemoteLogManager implements Closeable { private final Condition copyQuotaManagerLockCondition = copyQuotaManagerLock.newCondition(); private final RLMQuotaManager rlmCopyQuotaManager; private final RLMQuotaManager rlmFetchQuotaManager; + private final Sensor fetchThrottleTimeSensor; private final RemoteIndexCache indexCache; private final RemoteStorageThreadPool remoteStorageReaderThreadPool; @@ -229,6 +233,9 @@ public class RemoteLogManager implements Closeable { rlmCopyQuotaManager = createRLMCopyQuotaManager(); rlmFetchQuotaManager = createRLMFetchQuotaManager(); + fetchThrottleTimeSensor = new RLMQuotaMetrics(metrics, "remote-fetch-throttle-time", RemoteLogManager.class.getSimpleName(), + "The %s time in millis remote fetches was throttled by a broker", INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS).sensor(); + indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir); delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs(); rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize()); @@ -286,8 +293,12 @@ public class RemoteLogManager implements Closeable { "Tracking fetch byte-rate for Remote Log Manager", time); } - public boolean isRemoteLogFetchQuotaExceeded() { - return rlmFetchQuotaManager.isQuotaExceeded(); + public long getFetchThrottleTimeMs() { + return rlmFetchQuotaManager.getThrottleTimeMs(); + } + + public Sensor fetchThrottleTimeSensor() { + return fetchThrottleTimeSensor; } static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) { @@ -804,7 +815,7 @@ public class RemoteLogManager implements Closeable { copyQuotaManagerLock.lock(); try { - while (rlmCopyQuotaManager.isQuotaExceeded()) { + while (rlmCopyQuotaManager.getThrottleTimeMs() > 0) { logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); // If the thread gets interrupted while waiting, the InterruptedException is thrown // back to the caller. It's important to note that the task being executed is already diff --git a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java index 68513dd4215..0f695a21e63 100644 --- a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java +++ b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java @@ -18,6 +18,7 @@ package kafka.log.remote.quota; import kafka.server.QuotaType; import kafka.server.SensorAccess; +import kafka.utils.QuotaUtils; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.KafkaMetric; @@ -80,16 +81,16 @@ public class RLMQuotaManager { } } - public boolean isQuotaExceeded() { + public long getThrottleTimeMs() { Sensor sensorInstance = sensor(); try { sensorInstance.checkQuotas(); } catch (QuotaViolationException qve) { LOGGER.debug("Quota violated for sensor ({}), metric: ({}), metric-value: ({}), bound: ({})", sensorInstance.name(), qve.metric().metricName(), qve.value(), qve.bound()); - return true; + return QuotaUtils.throttleTime(qve, time.milliseconds()); } - return false; + return 0L; } public void record(double value) { diff --git a/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java b/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java new file mode 100644 index 00000000000..96dd6b1b7e6 --- /dev/null +++ b/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.SensorAccess; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import scala.runtime.BoxedUnit; + +public class RLMQuotaMetrics { + + private final Sensor sensor; + + public RLMQuotaMetrics(Metrics metrics, String name, String group, String descriptionFormat, long expirationTime) { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + SensorAccess sensorAccess = new SensorAccess(lock, metrics); + this.sensor = sensorAccess.getOrCreate(name, expirationTime, s -> { + s.add(metrics.metricName(name + "-avg", group, String.format(descriptionFormat, "average")), new Avg()); + s.add(metrics.metricName(name + "-max", group, String.format(descriptionFormat, "maximum")), new Max()); + return BoxedUnit.UNIT; + }); + } + + public Sensor sensor() { + return sensor; + } +} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8ffc7eae585..6ebd151a032 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1762,7 +1762,11 @@ class ReplicaManager(val config: KafkaConfig, createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset, new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage")) } else { - val fetchDataInfo = if (remoteLogManager.get.isRemoteLogFetchQuotaExceeded) { + val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs() + val fetchDataInfo = if (throttleTimeMs > 0) { + // Record the throttle time for the remote log fetches + remoteLogManager.get.fetchThrottleTimeSensor().record(throttleTimeMs, time.milliseconds()) + // We do not want to send an exception in a LogReadResult response (like we do in other cases when we send // UnknownOffsetMetadata), because it is classified as an error in reading the data, and a response is // immediately sent back to the client. Instead, we want to serve data for the other topic partitions of the diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 5b8030b6756..8cf190785f5 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -2870,7 +2870,7 @@ public class RemoteLogManagerTest { assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog)); // Verify quota check was performed - verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded(); + verify(rlmCopyQuotaManager, times(1)).getThrottleTimeMs(); // Verify bytes to copy was recorded with the quota manager verify(rlmCopyQuotaManager, times(1)).record(10); @@ -2893,7 +2893,7 @@ public class RemoteLogManagerTest { Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds); // Ensure the copy operation is waiting for quota to be available TestUtils.waitForCondition(() -> { - verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded(); + verify(rlmCopyQuotaManager, atLeast(1)).getThrottleTimeMs(); return true; }, "Quota exceeded check did not happen"); // Verify RLM is able to shut down @@ -2958,7 +2958,7 @@ public class RemoteLogManagerTest { when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); - when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); + when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaExceeded ? 1000L : 0L); doNothing().when(rlmCopyQuotaManager).record(anyInt()); RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); @@ -3029,8 +3029,8 @@ public class RemoteLogManagerTest { when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); - // After the first call, isQuotaExceeded should return true - when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false, true); + // After the first call, getThrottleTimeMs should return non-zero throttle time + when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(0L, 1000L); doNothing().when(rlmCopyQuotaManager).record(anyInt()); RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); diff --git a/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java b/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java index 2d639d4f241..ce6c3ce3ecc 100644 --- a/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java +++ b/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java @@ -47,19 +47,19 @@ public class RLMQuotaManagerTest { RLMQuotaManager quotaManager = new RLMQuotaManager( new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, DESCRIPTION, time); - assertFalse(quotaManager.isQuotaExceeded()); + assertEquals(0L, quotaManager.getThrottleTimeMs()); quotaManager.record(500); // Move clock by 1 sec, quota is violated moveClock(1); - assertTrue(quotaManager.isQuotaExceeded()); + assertEquals(9_000L, quotaManager.getThrottleTimeMs()); // Move clock by another 8 secs, quota is still violated for the window moveClock(8); - assertTrue(quotaManager.isQuotaExceeded()); + assertEquals(1_000L, quotaManager.getThrottleTimeMs()); // Move clock by 1 sec, quota is no more violated moveClock(1); - assertFalse(quotaManager.isQuotaExceeded()); + assertEquals(0L, quotaManager.getThrottleTimeMs()); } @Test @@ -67,9 +67,9 @@ public class RLMQuotaManagerTest { RLMQuotaManager quotaManager = new RLMQuotaManager( new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, DESCRIPTION, time); - assertFalse(quotaManager.isQuotaExceeded()); + assertFalse(quotaManager.getThrottleTimeMs() > 0); quotaManager.record(51); - assertTrue(quotaManager.isQuotaExceeded()); + assertTrue(quotaManager.getThrottleTimeMs() > 0); Map<MetricName, KafkaMetric> fetchQuotaMetrics = metrics.metrics().entrySet().stream() .filter(entry -> entry.getKey().name().equals("byte-rate") && entry.getKey().group().equals(QUOTA_TYPE.toString())) @@ -88,7 +88,7 @@ public class RLMQuotaManagerTest { // Update quota to 60, quota is no more violated Quota quota60Bytes = new Quota(60, true); quotaManager.updateQuota(quota60Bytes); - assertFalse(quotaManager.isQuotaExceeded()); + assertFalse(quotaManager.getThrottleTimeMs() > 0); // Verify quota metrics were updated Map<MetricName, MetricConfig> configForQuotaMetricsAfterFirstUpdate = extractMetricConfig(fetchQuotaMetrics); @@ -100,7 +100,7 @@ public class RLMQuotaManagerTest { // Update quota to 40, quota is violated again Quota quota40Bytes = new Quota(40, true); quotaManager.updateQuota(quota40Bytes); - assertTrue(quotaManager.isQuotaExceeded()); + assertTrue(quotaManager.getThrottleTimeMs() > 0); // Verify quota metrics were updated assertNotEquals(configForQuotaMetricsAfterFirstUpdate, extractMetricConfig(fetchQuotaMetrics)); diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a1f3ce002c2..b1fffa88518 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -23,6 +23,8 @@ import kafka.cluster.PartitionTest.MockPartitionListener import kafka.cluster.{BrokerEndPoint, Partition} import kafka.log._ import kafka.log.remote.RemoteLogManager +import kafka.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS +import kafka.log.remote.quota.RLMQuotaMetrics import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile} import kafka.server.epoch.util.MockBlockingSender @@ -116,6 +118,8 @@ class ReplicaManagerTest { private var addPartitionsToTxnManager: AddPartitionsToTxnManager = _ private var brokerTopicStats: BrokerTopicStats = _ private val transactionSupportedOperation = genericError + private val quotaExceededThrottleTime = 1000 + private val quotaAvailableThrottleTime = 0 // Constants defined for readability private val zkVersion = 0 @@ -133,6 +137,13 @@ class ReplicaManagerTest { alterPartitionManager = mock(classOf[AlterPartitionManager]) quotaManager = QuotaFactory.instantiate(config, metrics, time, "") mockRemoteLogManager = mock(classOf[RemoteLogManager]) + when(mockRemoteLogManager.fetchThrottleTimeSensor()).thenReturn( + new RLMQuotaMetrics(metrics, + "remote-fetch-throttle-time", + classOf[RemoteLogManager].getSimpleName, + "The %s time in millis remote fetches was throttled by a broker", + INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS) + .sensor()) addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) // Anytime we try to verify, just automatically run the callback as though the transaction was verified. @@ -3357,7 +3368,8 @@ class ReplicaManagerTest { defaultTopicRemoteLogStorageEnable: Boolean = true, setupLogDirMetaProperties: Boolean = false, directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP, - buildRemoteLogAuxState: Boolean = false + buildRemoteLogAuxState: Boolean = false, + remoteFetchQuotaExceeded: Option[Boolean] = None ): ReplicaManager = { val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath @@ -3411,6 +3423,15 @@ class ReplicaManagerTest { when(metadataCache.contains(new TopicPartition(topic, 0))).thenReturn(true) + if (remoteFetchQuotaExceeded.isDefined) { + assertFalse(remoteLogManager.isDefined) + if (remoteFetchQuotaExceeded.get) { + when(mockRemoteLogManager.getFetchThrottleTimeMs()).thenReturn(quotaExceededThrottleTime) + } else { + when(mockRemoteLogManager.getFetchThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime) + } + } + // Transactional appends attempt to schedule to the request handler thread using a non request handler thread. Set this to avoid error. KafkaRequestHandler.setBypassThreadCheck(true) @@ -3967,7 +3988,7 @@ class ReplicaManagerTest { val tp0 = new TopicPartition(topic, 0) val tidp0 = new TopicIdPartition(topicId, tp0) // create a replicaManager with remoteLog enabled - val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteFetchQuotaExceeded = Some(false)) try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) @@ -4022,7 +4043,7 @@ class ReplicaManagerTest { val tp0 = new TopicPartition(topic, 0) val tidp0 = new TopicIdPartition(topicId, tp0) // create a replicaManager with remoteLog enabled - val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog= true) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog= true, remoteFetchQuotaExceeded = Some(false)) try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) @@ -6713,7 +6734,7 @@ class ReplicaManagerTest { @Test def testRemoteReadQuotaExceeded(): Unit = { - when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true) + when(mockRemoteLogManager.getFetchThrottleTimeMs).thenReturn(quotaExceededThrottleTime) val tp0 = new TopicPartition(topic, 0) val tpId0 = new TopicIdPartition(topicId, tp0) @@ -6727,11 +6748,17 @@ class ReplicaManagerTest { assertFalse(fetchInfo.firstEntryIncomplete) assertFalse(fetchInfo.abortedTransactions.isPresent) assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent) + + val allMetrics = metrics.metrics() + val avgMetric = allMetrics.get(metrics.metricName("remote-fetch-throttle-time-avg", "RemoteLogManager")) + val maxMetric = allMetrics.get(metrics.metricName("remote-fetch-throttle-time-max", "RemoteLogManager")) + assertEquals(quotaExceededThrottleTime, avgMetric.metricValue.asInstanceOf[Double].toLong) + assertEquals(quotaExceededThrottleTime, maxMetric.metricValue.asInstanceOf[Double].toLong) } @Test def testRemoteReadQuotaNotExceeded(): Unit = { - when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false) + when(mockRemoteLogManager.getFetchThrottleTimeMs).thenReturn(quotaAvailableThrottleTime) val tp0 = new TopicPartition(topic, 0) val tpId0 = new TopicIdPartition(topicId, tp0) @@ -6745,6 +6772,12 @@ class ReplicaManagerTest { assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment) assertEquals(MemoryRecords.EMPTY, fetchInfo.records) assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent) + + val allMetrics = metrics.metrics() + val avgMetric = allMetrics.get(metrics.metricName("remote-fetch-throttle-time-avg", "RemoteLogManager")) + val maxMetric = allMetrics.get(metrics.metricName("remote-fetch-throttle-time-max", "RemoteLogManager")) + assertEquals(Double.NaN, avgMetric.metricValue) + assertEquals(Double.NaN, maxMetric.metricValue) } private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = {