This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e69a3110687 KAFKA-19076 replace `String` by `Supplier<String>` for
UnifiedLog#maybeHandleIOException (#19392)
e69a3110687 is described below
commit e69a31106879dc5eaf8e80f1fe0c05422488df49
Author: Nick Guo <[email protected]>
AuthorDate: Mon Apr 7 00:43:44 2025 +0800
KAFKA-19076 replace `String` by `Supplier<String>` for
UnifiedLog#maybeHandleIOException (#19392)
jira: https://issues.apache.org/jira/browse/KAFKA-19076
the message is used when the function encounters error, so the error
message should be created lazy.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/storage/internals/log/UnifiedLog.java | 27 +++++++++++-----------
1 file changed, 14 insertions(+), 13 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index b9ea2dfd856..dcef6929d19 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -83,6 +83,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -922,7 +923,7 @@ public class UnifiedLog implements AutoCloseable {
localLog.checkIfMemoryMappedBufferClosed();
producerExpireCheck.cancel(true);
maybeHandleIOException(
- "Error while renaming dir for " + topicPartition() + " in
dir " + dir().getParent(),
+ () -> "Error while renaming dir for " + topicPartition() +
" in dir " + dir().getParent(),
() -> {
// We take a snapshot at the last written offset to
hopefully avoid the need to scan the log
// after restarting and to ensure that we cannot
inadvertently hit the upgrade optimization
@@ -945,7 +946,7 @@ public class UnifiedLog implements AutoCloseable {
public void renameDir(String name, boolean shouldReinitialize) {
synchronized (lock) {
maybeHandleIOException(
- "Error while renaming dir for " + topicPartition() + " in
log dir " + dir().getParent(),
+ () -> "Error while renaming dir for " + topicPartition() +
" in log dir " + dir().getParent(),
() -> {
// Flush partitionMetadata file before initializing
again
maybeFlushMetadataFile();
@@ -1087,7 +1088,7 @@ public class UnifiedLog implements AutoCloseable {
// they are valid, insert them in the log
synchronized (lock) {
return maybeHandleIOException(
- "Error while appending records to " + topicPartition()
+ " in dir " + dir().getParent(),
+ () -> "Error while appending records to " +
topicPartition() + " in dir " + dir().getParent(),
() -> {
MemoryRecords validRecords = trimmedRecords;
localLog.checkIfMemoryMappedBufferClosed();
@@ -1300,7 +1301,7 @@ public class UnifiedLog implements AutoCloseable {
// The deleteRecordsOffset may be lost only if all in-sync replicas of
this broker are shutdown
// in an unclean manner within
log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is
low.
return maybeHandleIOException(
- "Exception while increasing log start offset for " +
topicPartition() + " to " + newLogStartOffset + " in dir " + dir().getParent(),
+ () -> "Exception while increasing log start offset for " +
topicPartition() + " to " + newLogStartOffset + " in dir " + dir().getParent(),
() -> {
synchronized (lock) {
if (newLogStartOffset > highWatermark()) {
@@ -1613,7 +1614,7 @@ public class UnifiedLog implements AutoCloseable {
*/
public OffsetResultHolder fetchOffsetByTimestamp(long targetTimestamp,
Optional<AsyncOffsetReader> remoteOffsetReader) {
return maybeHandleIOException(
- "Error while fetching offset by timestamp for " +
topicPartition() + " in dir " + dir().getParent(),
+ () -> "Error while fetching offset by timestamp for " +
topicPartition() + " in dir " + dir().getParent(),
() -> {
logger.debug("Searching offset for timestamp {}.",
targetTimestamp);
@@ -1831,7 +1832,7 @@ public class UnifiedLog implements AutoCloseable {
}
private int deleteSegments(List<LogSegment> deletable,
SegmentDeletionReason reason) {
- return maybeHandleIOException("Error while deleting segments for " +
topicPartition() + " in dir " + dir().getParent(),
+ return maybeHandleIOException(() -> "Error while deleting segments for
" + topicPartition() + " in dir " + dir().getParent(),
() -> {
int numToDelete = deletable.size();
if (numToDelete > 0) {
@@ -2138,7 +2139,7 @@ public class UnifiedLog implements AutoCloseable {
long flushOffset = includingOffset ? offset + 1 : offset;
String includingOffsetStr = includingOffset ? "inclusive" :
"exclusive";
maybeHandleIOException(
- "Error while flushing log for " + topicPartition() + " in dir
" + dir().getParent() + " with offset " + offset +
+ () -> "Error while flushing log for " + topicPartition() + "
in dir " + dir().getParent() + " with offset " + offset +
" (" + includingOffsetStr + ") and recovery point " + offset,
() -> {
if (flushOffset > localLog.recoveryPoint()) {
@@ -2158,7 +2159,7 @@ public class UnifiedLog implements AutoCloseable {
*/
public void delete() {
maybeHandleIOException(
- "Error while deleting log for " + topicPartition() + " in dir " +
dir().getParent(),
+ () -> "Error while deleting log for " + topicPartition() + " in
dir " + dir().getParent(),
() -> {
synchronized (lock) {
localLog.checkIfMemoryMappedBufferClosed();
@@ -2204,7 +2205,7 @@ public class UnifiedLog implements AutoCloseable {
// visible for testing
public void flushProducerStateSnapshot(Path snapshot) {
maybeHandleIOException(
- "Error while deleting producer state snapshot " + snapshot + "
for " + topicPartition() + " in dir " + dir().getParent(),
+ () -> "Error while deleting producer state snapshot " +
snapshot + " for " + topicPartition() + " in dir " + dir().getParent(),
() -> {
Utils.flushFileIfExists(snapshot);
return null;
@@ -2219,7 +2220,7 @@ public class UnifiedLog implements AutoCloseable {
*/
public boolean truncateTo(long targetOffset) {
return maybeHandleIOException(
- "Error while truncating log to offset " + targetOffset + " for
" + topicPartition() + " in dir " + dir().getParent(),
+ () -> "Error while truncating log to offset " + targetOffset +
" for " + topicPartition() + " in dir " + dir().getParent(),
() -> {
if (targetOffset < 0) {
throw new IllegalArgumentException("Cannot truncate
partition " + topicPartition() + " to a negative offset (" + targetOffset +
").");
@@ -2263,7 +2264,7 @@ public class UnifiedLog implements AutoCloseable {
*/
public void truncateFullyAndStartAt(long newOffset, Optional<Long>
logStartOffsetOpt) {
maybeHandleIOException(
- "Error while truncating the entire log for " +
topicPartition() + " in dir " + dir().getParent(),
+ () -> "Error while truncating the entire log for " +
topicPartition() + " in dir " + dir().getParent(),
() -> {
logger.debug("Truncate and start at offset {},
logStartOffset: {}", newOffset, logStartOffsetOpt.orElse(newOffset));
synchronized (lock) {
@@ -2370,8 +2371,8 @@ public class UnifiedLog implements AutoCloseable {
metricNames.clear();
}
- private <T> T maybeHandleIOException(String msg, StorageAction<T,
IOException> fun) throws KafkaStorageException {
- return LocalLog.maybeHandleIOException(logDirFailureChannel(),
parentDir(), () -> msg, fun);
+ private <T> T maybeHandleIOException(Supplier<String> msg,
StorageAction<T, IOException> fun) throws KafkaStorageException {
+ return LocalLog.maybeHandleIOException(logDirFailureChannel(),
parentDir(), msg, fun);
}
public List<LogSegment> splitOverflowedSegment(LogSegment segment) throws
IOException {