TaiJuWu commented on code in PR #20635:
URL: https://github.com/apache/kafka/pull/20635#discussion_r2637811939


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java:
##########
@@ -33,4 +45,185 @@ public static LogSegment createSegment(long offset, File 
logDir, int indexInterv
         // Create and return the LogSegment instance
         return new LogSegment(ms, idx, timeIdx, txnIndex, offset, 
indexIntervalBytes, 0, time);
     }
+
+
+    /**
+     * Append an end transaction marker (commit or abort) to the log as a 
leader.
+     *
+     * @param transactionVersion the transaction version (1 = TV1, 2 = TV2) 
etc. Must be explicitly specified.
+     *                          TV2 markers require strict epoch validation 
(markerEpoch > currentEpoch),
+     *                          while legacy markers use relaxed validation 
(markerEpoch >= currentEpoch).
+     */
+    public static LogAppendInfo appendEndTxnMarkerAsLeader(UnifiedLog log,
+                                                           long producerId,
+                                                           short producerEpoch,
+                                                           ControlRecordType 
controlType,
+                                                           long timestamp,
+                                                           int 
coordinatorEpoch,
+                                                           int leaderEpoch,
+                                                           short 
transactionVersion) {
+        MemoryRecords records = endTxnRecords(controlType, producerId, 
producerEpoch, 0L, coordinatorEpoch, leaderEpoch, timestamp);
+
+        return log.appendAsLeader(records, leaderEpoch, 
AppendOrigin.COORDINATOR, RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
transactionVersion);
+    }
+
+    public static MemoryRecords endTxnRecords(ControlRecordType 
controlRecordType,
+                                              long producerId,
+                                              short epoch,
+                                              long offset,
+                                              int coordinatorEpoch,
+                                              int partitionLeaderEpoch,
+                                              long timestamp) {
+        EndTransactionMarker marker = new 
EndTransactionMarker(controlRecordType, coordinatorEpoch);
+        return MemoryRecords.withEndTransactionMarker(offset, timestamp, 
partitionLeaderEpoch, producerId, epoch, marker);
+    }
+
+    @SuppressWarnings("ParameterNumber")
+    public static UnifiedLog createLog(File dir,
+                                       LogConfig config,
+                                       BrokerTopicStats brokerTopicStats,
+                                       Scheduler scheduler,
+                                       Time time,
+                                       long logStartOffset,
+                                       long recoveryPoint,
+                                       int maxTransactionTimeoutMs,
+                                       ProducerStateManagerConfig 
producerStateManagerConfig,
+                                       int producerIdExpirationCheckIntervalMs,
+                                       boolean lastShutdownClean,
+                                       Optional<Uuid> topicId,
+                                       ConcurrentMap<String, Integer> 
numRemainingSegments,
+                                       boolean remoteStorageSystemEnable,
+                                       LogOffsetsListener logOffsetsListener) 
throws IOException {
+        return UnifiedLog.create(
+                dir,
+                config,
+                logStartOffset,
+                recoveryPoint,
+                scheduler,
+                brokerTopicStats,
+                time,
+                maxTransactionTimeoutMs,
+                producerStateManagerConfig,
+                producerIdExpirationCheckIntervalMs,
+                new LogDirFailureChannel(10),
+                lastShutdownClean,
+                topicId,
+                numRemainingSegments,
+                remoteStorageSystemEnable,
+                logOffsetsListener
+        );
+    }
+
+    public static class LogConfigBuilder {

Review Comment:
   Thanks point out, it actually simply the logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to