junrao commented on code in PR #13255:
URL: https://github.com/apache/kafka/pull/13255#discussion_r1109170546
##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordConversionStats;
+import org.apache.kafka.common.requests.ProduceResponse.RecordError;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+/**
+ * Struct to hold various quantities we compute about each message set before
appending to the log.
+ */
+public class LogAppendInfo {
+
+public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new
LogAppendInfo(Optional.empty(), -1, OptionalInt.empty(),
+RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
+RecordConversionStats.EMPTY, CompressionType.NONE,
CompressionType.NONE, -1, -1,
+false, -1L);
+
+private Optional firstOffset;
+private long lastOffset;
+private OptionalInt lastLeaderEpoch;
+private long maxTimestamp;
+private long offsetOfMaxTimestamp;
+private long logAppendTime;
+private long logStartOffset;
+private RecordConversionStats recordConversionStats;
+private CompressionType sourceCompression;
+private CompressionType targetCompression;
+private int shallowCount;
+private int validBytes;
+private boolean offsetsMonotonic;
+private long lastOffsetOfFirstBatch;
+private List recordErrors;
+private String errorMessage;
+private LeaderHwChange leaderHwChange;
+
+/**
+ * Creates an instance with the given params.
+ *
+ * @param firstOffsetThe first offset in the message set
unless the message format is less than V2 and we are appending
+ * to the follower. If the message is a
duplicate message the segment base offset and relative position
+ * in segment will be unknown.
+ * @param lastOffset The last offset in the message set
+ * @param lastLeaderEpochThe partition leader epoch corresponding
to the last offset, if available.
+ * @param maxTimestamp The maximum timestamp of the message set.
+ * @param offsetOfMaxTimestamp The offset of the message with the
maximum timestamp.
+ * @param logAppendTime The log append time (if used) of the
message set, otherwise Message.NoTimestamp
+ * @param logStartOffset The start offset of the log at the time
of this append.
+ * @param recordConversionStats Statistics collected during record
processing, `null` if `assignOffsets` is `false`
+ * @param sourceCompression The source codec used in the message set
(send by the producer)
+ * @param targetCompression The target codec of the message set(after
applying the broker compression configuration if any)
+ * @param shallowCount The number of shallow messages
+ * @param validBytes The number of valid bytes
+ * @param offsetsMonotonic Are the offsets in this message set
monotonically increasing
+ * @param lastOffsetOfFirstBatch The last offset of the first batch
+ */
+public LogAppendInfo(Optional firstOffset,
+ long lastOffset,
+ OptionalInt lastLeaderEpoch,
+ long maxTimestamp,
+ long offsetOfMaxTimestamp,
+ long logAppendTime,
+ long logStartOffset,
+ RecordConversionStats recordConversionStats,
+ CompressionType sourceCompression,
+ CompressionType targetCompression,
+ int shallowCount,
+ int validBytes,
+ boolean offsetsMonotonic,
+ long lastOffsetOfFirstBatch) {
+this(firstOffset, last