[GitHub] [kafka] junrao commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-17 Thread via GitHub


junrao commented on code in PR #13255:
URL: https://github.com/apache/kafka/pull/13255#discussion_r1110260488


##
core/src/test/scala/other/kafka/StressTestLog.scala:
##
@@ -123,7 +123,8 @@ object StressTestLog {
   class WriterThread(val log: UnifiedLog) extends WorkerThread with 
LogProgress {
 override def work(): Unit = {
   val logAppendInfo = 
log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 
0)
-  require(logAppendInfo.firstOffset.forall(_.messageOffset == 
currentOffset) && logAppendInfo.lastOffset == currentOffset)
+  require((!logAppendInfo.firstOffset.isPresent || 
logAppendInfo.firstOffset.get().messageOffset == currentOffset)

Review Comment:
   It seems that `logAppendInfo.firstOffset.isEmpty` is simpler than 
`!logAppendInfo.firstOffset.isPresent`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-16 Thread via GitHub


junrao commented on code in PR #13255:
URL: https://github.com/apache/kafka/pull/13255#discussion_r1109170546


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordConversionStats;
+import org.apache.kafka.common.requests.ProduceResponse.RecordError;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+/**
+ * Struct to hold various quantities we compute about each message set before 
appending to the log.
+ */
+public class LogAppendInfo {
+
+public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new 
LogAppendInfo(Optional.empty(), -1, OptionalInt.empty(),
+RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
+RecordConversionStats.EMPTY, CompressionType.NONE, 
CompressionType.NONE, -1, -1,
+false, -1L);
+
+private Optional firstOffset;
+private long lastOffset;
+private OptionalInt lastLeaderEpoch;
+private long maxTimestamp;
+private long offsetOfMaxTimestamp;
+private long logAppendTime;
+private long logStartOffset;
+private RecordConversionStats recordConversionStats;
+private CompressionType sourceCompression;
+private CompressionType targetCompression;
+private int shallowCount;
+private int validBytes;
+private boolean offsetsMonotonic;
+private long lastOffsetOfFirstBatch;
+private List recordErrors;
+private String errorMessage;
+private LeaderHwChange leaderHwChange;
+
+/**
+ * Creates an instance with the given params.
+ *
+ * @param firstOffsetThe first offset in the message set 
unless the message format is less than V2 and we are appending
+ *   to the follower. If the message is a 
duplicate message the segment base offset and relative position
+ *   in segment will be unknown.
+ * @param lastOffset The last offset in the message set
+ * @param lastLeaderEpochThe partition leader epoch corresponding 
to the last offset, if available.
+ * @param maxTimestamp   The maximum timestamp of the message set.
+ * @param offsetOfMaxTimestamp   The offset of the message with the 
maximum timestamp.
+ * @param logAppendTime  The log append time (if used) of the 
message set, otherwise Message.NoTimestamp
+ * @param logStartOffset The start offset of the log at the time 
of this append.
+ * @param recordConversionStats  Statistics collected during record 
processing, `null` if `assignOffsets` is `false`
+ * @param sourceCompression  The source codec used in the message set 
(send by the producer)
+ * @param targetCompression  The target codec of the message set(after 
applying the broker compression configuration if any)
+ * @param shallowCount   The number of shallow messages
+ * @param validBytes The number of valid bytes
+ * @param offsetsMonotonic   Are the offsets in this message set 
monotonically increasing
+ * @param lastOffsetOfFirstBatch The last offset of the first batch
+ */
+public LogAppendInfo(Optional firstOffset,
+ long lastOffset,
+ OptionalInt lastLeaderEpoch,
+ long maxTimestamp,
+ long offsetOfMaxTimestamp,
+ long logAppendTime,
+ long logStartOffset,
+ RecordConversionStats recordConversionStats,
+ CompressionType sourceCompression,
+ CompressionType targetCompression,
+ int shallowCount,
+ int validBytes,
+ boolean offsetsMonotonic,
+ long lastOffsetOfFirstBatch) {
+this(firstOffset, last