This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bbbc0cf7931 MINOR: Fix format in CoordinatorLoaderImpl (#20538)
bbbc0cf7931 is described below

commit bbbc0cf79313a083b40cd206c132fd66dccd3ff5
Author: David Jacot <[email protected]>
AuthorDate: Wed Sep 17 11:13:28 2025 +0200

    MINOR: Fix format in CoordinatorLoaderImpl (#20538)
    
    The format of the code in `CoordinatorLoaderImpl` in inconsistent with
    the rest of the code in the package. This small PR fixes it.
    
    Reviewers: Ken Huang <[email protected]>, TengYao Chi
     <[email protected]>, Andrew Schofield <[email protected]>, Sean
     Quah <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../common/runtime/CoordinatorLoaderImpl.java      | 79 +++++++++++-----------
 1 file changed, 38 insertions(+), 41 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
index 078dad36ef8..6613ce25fc8 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
@@ -62,11 +62,11 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
     private final KafkaScheduler scheduler = new KafkaScheduler(1);
 
     public CoordinatorLoaderImpl(
-            Time time,
-            Function<TopicPartition, Optional<UnifiedLog>> 
partitionLogSupplier,
-            Function<TopicPartition, Optional<Long>> 
partitionLogEndOffsetSupplier,
-            Deserializer<T> deserializer,
-            int loadBufferSize
+        Time time,
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier,
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier,
+        Deserializer<T> deserializer,
+        int loadBufferSize
     ) {
         this.time = time;
         this.partitionLogSupplier = partitionLogSupplier;
@@ -89,7 +89,7 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
         long startTimeMs = time.milliseconds();
         try {
             ScheduledFuture<?> result = 
scheduler.scheduleOnce(String.format("Load coordinator from %s", tp),
-                    () -> doLoad(tp, coordinator, future, startTimeMs));
+                () -> doLoad(tp, coordinator, future, startTimeMs));
             if (result.isCancelled()) {
                 future.completeExceptionally(new RuntimeException("Coordinator 
loader is closed."));
             }
@@ -100,17 +100,17 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
     }
 
     private void doLoad(
-            TopicPartition tp,
-            CoordinatorPlayback<T> coordinator,
-            CompletableFuture<LoadSummary> future,
-            long startTimeMs
+        TopicPartition tp,
+        CoordinatorPlayback<T> coordinator,
+        CompletableFuture<LoadSummary> future,
+        long startTimeMs
     ) {
         long schedulerQueueTimeMs = time.milliseconds() - startTimeMs;
         try {
             Optional<UnifiedLog> logOpt = partitionLogSupplier.apply(tp);
             if (logOpt.isEmpty()) {
                 future.completeExceptionally(new NotLeaderOrFollowerException(
-                        "Could not load records from " + tp + " because the 
log does not exist."));
+                    "Could not load records from " + tp + " because the log 
does not exist."));
                 return;
             }
 
@@ -142,8 +142,7 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
 
             if (logEndOffset(tp) == -1L) {
                 future.completeExceptionally(new NotLeaderOrFollowerException(
-                        String.format("Stopped loading records from %s because 
the partition is not online or is no longer the leader.", tp)
-                ));
+                    String.format("Stopped loading records from %s because the 
partition is not online or is no longer the leader.", tp)));
             } else if (isRunning.get()) {
                 future.complete(new LoadSummary(startTimeMs, endTimeMs, 
schedulerQueueTimeMs, stats.numRecords, stats.numBytes));
             } else {
@@ -186,7 +185,7 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
             if (buffer.capacity() < bytesNeeded) {
                 if (loadBufferSize < bytesNeeded) {
                     LOG.warn("Loaded metadata from {} with buffer larger ({} 
bytes) than" +
-                            " configured buffer size ({} bytes).", tp, 
bytesNeeded, loadBufferSize);
+                        " configured buffer size ({} bytes).", tp, 
bytesNeeded, loadBufferSize);
                 }
 
                 buffer = ByteBuffer.allocate(bytesNeeded);
@@ -202,15 +201,14 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
     }
 
     private ReplayResult processMemoryRecords(
-            TopicPartition tp,
-            UnifiedLog log,
-            MemoryRecords memoryRecords,
-            CoordinatorPlayback<T> coordinator,
-            LoadStats loadStats,
-            long currentOffset,
-            long previousHighWatermark
+        TopicPartition tp,
+        UnifiedLog log,
+        MemoryRecords memoryRecords,
+        CoordinatorPlayback<T> coordinator,
+        LoadStats loadStats,
+        long currentOffset,
+        long previousHighWatermark
     ) {
-
         for (MutableRecordBatch batch : memoryRecords.batches()) {
             if (batch.isControlBatch()) {
                 for (Record record : batch) {
@@ -220,8 +218,8 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
                     if (controlRecord == ControlRecordType.COMMIT) {
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Replaying end transaction marker from 
{} at offset {} to commit" +
-                                            " transaction with producer id {} 
and producer epoch {}.",
-                                    tp, record.offset(), batch.producerId(), 
batch.producerEpoch());
+                                " transaction with producer id {} and producer 
epoch {}.",
+                                tp, record.offset(), batch.producerId(), 
batch.producerEpoch());
                         }
                         coordinator.replayEndTransactionMarker(
                                 batch.producerId(),
@@ -231,8 +229,8 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
                     } else if (controlRecord == ControlRecordType.ABORT) {
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Replaying end transaction marker from 
{} at offset {} to abort" +
-                                            " transaction with producer id {} 
and producer epoch {}.",
-                                    tp, record.offset(), batch.producerId(), 
batch.producerEpoch());
+                                " transaction with producer id {} and producer 
epoch {}.",
+                                tp, record.offset(), batch.producerId(), 
batch.producerEpoch());
                         }
                         coordinator.replayEndTransactionMarker(
                                 batch.producerId(),
@@ -250,7 +248,7 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
                         coordinatorRecordOpt = 
Optional.ofNullable(deserializer.deserialize(record.key(), record.value()));
                     } catch (Deserializer.UnknownRecordTypeException ex) {
                         LOG.warn("Unknown record type {} while loading offsets 
and group metadata from {}." +
-                                " Ignoring it. It could be a left over from an 
aborted upgrade.", ex.unknownType(), tp);
+                            " Ignoring it. It could be a left over from an 
aborted upgrade.", ex.unknownType(), tp);
                     } catch (RuntimeException ex) {
                         String msg = String.format("Deserializing record %s 
from %s failed.", record, tp);
                         LOG.error(msg, ex);
@@ -261,18 +259,18 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
                         try {
                             if (LOG.isTraceEnabled()) {
                                 LOG.trace("Replaying record {} from {} at 
offset {} with producer id {}" +
-                                        " and producer epoch {}.", 
coordinatorRecord, tp, record.offset(), batch.producerId(), 
batch.producerEpoch());
+                                    " and producer epoch {}.", 
coordinatorRecord, tp, record.offset(), batch.producerId(), 
batch.producerEpoch());
                             }
                             coordinator.replay(
-                                    record.offset(),
-                                    batch.producerId(),
-                                    batch.producerEpoch(),
-                                    coordinatorRecord
+                                record.offset(),
+                                batch.producerId(),
+                                batch.producerEpoch(),
+                                coordinatorRecord
                             );
                         } catch (RuntimeException ex) {
                             String msg = String.format("Replaying record %s 
from %s at offset %d with producer id %d and" +
-                                            " producer epoch %d failed.", 
coordinatorRecord, tp, record.offset(),
-                                    batch.producerId(), batch.producerEpoch());
+                                " producer epoch %d failed.", 
coordinatorRecord, tp, record.offset(),
+                                batch.producerId(), batch.producerEpoch());
                             LOG.error(msg, ex);
                             throw new RuntimeException(msg, ex);
                         }
@@ -320,14 +318,13 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
 
         @Override
         public String toString() {
-            return "LoadStats{" +
-                    "numRecords=" + numRecords +
-                    ", numBytes=" + numBytes +
-                    ", readAtLeastOneRecord=" + readAtLeastOneRecord +
-                    '}';
+            return "LoadStats(" +
+                "numRecords=" + numRecords +
+                ", numBytes=" + numBytes +
+                ", readAtLeastOneRecord=" + readAtLeastOneRecord +
+                ')';
         }
     }
 
-    private record ReplayResult(long nextOffset, long highWatermark) {
-    }
+    private record ReplayResult(long nextOffset, long highWatermark) { }
 }

Reply via email to