This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bbbc0cf7931 MINOR: Fix format in CoordinatorLoaderImpl (#20538)
bbbc0cf7931 is described below
commit bbbc0cf79313a083b40cd206c132fd66dccd3ff5
Author: David Jacot <[email protected]>
AuthorDate: Wed Sep 17 11:13:28 2025 +0200
MINOR: Fix format in CoordinatorLoaderImpl (#20538)
The format of the code in `CoordinatorLoaderImpl` in inconsistent with
the rest of the code in the package. This small PR fixes it.
Reviewers: Ken Huang <[email protected]>, TengYao Chi
<[email protected]>, Andrew Schofield <[email protected]>, Sean
Quah <[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../common/runtime/CoordinatorLoaderImpl.java | 79 +++++++++++-----------
1 file changed, 38 insertions(+), 41 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
index 078dad36ef8..6613ce25fc8 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
@@ -62,11 +62,11 @@ public class CoordinatorLoaderImpl<T> implements
CoordinatorLoader<T> {
private final KafkaScheduler scheduler = new KafkaScheduler(1);
public CoordinatorLoaderImpl(
- Time time,
- Function<TopicPartition, Optional<UnifiedLog>>
partitionLogSupplier,
- Function<TopicPartition, Optional<Long>>
partitionLogEndOffsetSupplier,
- Deserializer<T> deserializer,
- int loadBufferSize
+ Time time,
+ Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier,
+ Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier,
+ Deserializer<T> deserializer,
+ int loadBufferSize
) {
this.time = time;
this.partitionLogSupplier = partitionLogSupplier;
@@ -89,7 +89,7 @@ public class CoordinatorLoaderImpl<T> implements
CoordinatorLoader<T> {
long startTimeMs = time.milliseconds();
try {
ScheduledFuture<?> result =
scheduler.scheduleOnce(String.format("Load coordinator from %s", tp),
- () -> doLoad(tp, coordinator, future, startTimeMs));
+ () -> doLoad(tp, coordinator, future, startTimeMs));
if (result.isCancelled()) {
future.completeExceptionally(new RuntimeException("Coordinator
loader is closed."));
}
@@ -100,17 +100,17 @@ public class CoordinatorLoaderImpl<T> implements
CoordinatorLoader<T> {
}
private void doLoad(
- TopicPartition tp,
- CoordinatorPlayback<T> coordinator,
- CompletableFuture<LoadSummary> future,
- long startTimeMs
+ TopicPartition tp,
+ CoordinatorPlayback<T> coordinator,
+ CompletableFuture<LoadSummary> future,
+ long startTimeMs
) {
long schedulerQueueTimeMs = time.milliseconds() - startTimeMs;
try {
Optional<UnifiedLog> logOpt = partitionLogSupplier.apply(tp);
if (logOpt.isEmpty()) {
future.completeExceptionally(new NotLeaderOrFollowerException(
- "Could not load records from " + tp + " because the
log does not exist."));
+ "Could not load records from " + tp + " because the log
does not exist."));
return;
}
@@ -142,8 +142,7 @@ public class CoordinatorLoaderImpl<T> implements
CoordinatorLoader<T> {
if (logEndOffset(tp) == -1L) {
future.completeExceptionally(new NotLeaderOrFollowerException(
- String.format("Stopped loading records from %s because
the partition is not online or is no longer the leader.", tp)
- ));
+ String.format("Stopped loading records from %s because the
partition is not online or is no longer the leader.", tp)));
} else if (isRunning.get()) {
future.complete(new LoadSummary(startTimeMs, endTimeMs,
schedulerQueueTimeMs, stats.numRecords, stats.numBytes));
} else {
@@ -186,7 +185,7 @@ public class CoordinatorLoaderImpl<T> implements
CoordinatorLoader<T> {
if (buffer.capacity() < bytesNeeded) {
if (loadBufferSize < bytesNeeded) {
LOG.warn("Loaded metadata from {} with buffer larger ({}
bytes) than" +
- " configured buffer size ({} bytes).", tp,
bytesNeeded, loadBufferSize);
+ " configured buffer size ({} bytes).", tp,
bytesNeeded, loadBufferSize);
}
buffer = ByteBuffer.allocate(bytesNeeded);
@@ -202,15 +201,14 @@ public class CoordinatorLoaderImpl<T> implements
CoordinatorLoader<T> {
}
private ReplayResult processMemoryRecords(
- TopicPartition tp,
- UnifiedLog log,
- MemoryRecords memoryRecords,
- CoordinatorPlayback<T> coordinator,
- LoadStats loadStats,
- long currentOffset,
- long previousHighWatermark
+ TopicPartition tp,
+ UnifiedLog log,
+ MemoryRecords memoryRecords,
+ CoordinatorPlayback<T> coordinator,
+ LoadStats loadStats,
+ long currentOffset,
+ long previousHighWatermark
) {
-
for (MutableRecordBatch batch : memoryRecords.batches()) {
if (batch.isControlBatch()) {
for (Record record : batch) {
@@ -220,8 +218,8 @@ public class CoordinatorLoaderImpl<T> implements
CoordinatorLoader<T> {
if (controlRecord == ControlRecordType.COMMIT) {
if (LOG.isTraceEnabled()) {
LOG.trace("Replaying end transaction marker from
{} at offset {} to commit" +
- " transaction with producer id {}
and producer epoch {}.",
- tp, record.offset(), batch.producerId(),
batch.producerEpoch());
+ " transaction with producer id {} and producer
epoch {}.",
+ tp, record.offset(), batch.producerId(),
batch.producerEpoch());
}
coordinator.replayEndTransactionMarker(
batch.producerId(),
@@ -231,8 +229,8 @@ public class CoordinatorLoaderImpl<T> implements
CoordinatorLoader<T> {
} else if (controlRecord == ControlRecordType.ABORT) {
if (LOG.isTraceEnabled()) {
LOG.trace("Replaying end transaction marker from
{} at offset {} to abort" +
- " transaction with producer id {}
and producer epoch {}.",
- tp, record.offset(), batch.producerId(),
batch.producerEpoch());
+ " transaction with producer id {} and producer
epoch {}.",
+ tp, record.offset(), batch.producerId(),
batch.producerEpoch());
}
coordinator.replayEndTransactionMarker(
batch.producerId(),
@@ -250,7 +248,7 @@ public class CoordinatorLoaderImpl<T> implements
CoordinatorLoader<T> {
coordinatorRecordOpt =
Optional.ofNullable(deserializer.deserialize(record.key(), record.value()));
} catch (Deserializer.UnknownRecordTypeException ex) {
LOG.warn("Unknown record type {} while loading offsets
and group metadata from {}." +
- " Ignoring it. It could be a left over from an
aborted upgrade.", ex.unknownType(), tp);
+ " Ignoring it. It could be a left over from an
aborted upgrade.", ex.unknownType(), tp);
} catch (RuntimeException ex) {
String msg = String.format("Deserializing record %s
from %s failed.", record, tp);
LOG.error(msg, ex);
@@ -261,18 +259,18 @@ public class CoordinatorLoaderImpl<T> implements
CoordinatorLoader<T> {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Replaying record {} from {} at
offset {} with producer id {}" +
- " and producer epoch {}.",
coordinatorRecord, tp, record.offset(), batch.producerId(),
batch.producerEpoch());
+ " and producer epoch {}.",
coordinatorRecord, tp, record.offset(), batch.producerId(),
batch.producerEpoch());
}
coordinator.replay(
- record.offset(),
- batch.producerId(),
- batch.producerEpoch(),
- coordinatorRecord
+ record.offset(),
+ batch.producerId(),
+ batch.producerEpoch(),
+ coordinatorRecord
);
} catch (RuntimeException ex) {
String msg = String.format("Replaying record %s
from %s at offset %d with producer id %d and" +
- " producer epoch %d failed.",
coordinatorRecord, tp, record.offset(),
- batch.producerId(), batch.producerEpoch());
+ " producer epoch %d failed.",
coordinatorRecord, tp, record.offset(),
+ batch.producerId(), batch.producerEpoch());
LOG.error(msg, ex);
throw new RuntimeException(msg, ex);
}
@@ -320,14 +318,13 @@ public class CoordinatorLoaderImpl<T> implements
CoordinatorLoader<T> {
@Override
public String toString() {
- return "LoadStats{" +
- "numRecords=" + numRecords +
- ", numBytes=" + numBytes +
- ", readAtLeastOneRecord=" + readAtLeastOneRecord +
- '}';
+ return "LoadStats(" +
+ "numRecords=" + numRecords +
+ ", numBytes=" + numBytes +
+ ", readAtLeastOneRecord=" + readAtLeastOneRecord +
+ ')';
}
}
- private record ReplayResult(long nextOffset, long highWatermark) {
- }
+ private record ReplayResult(long nextOffset, long highWatermark) { }
}