ijuma commented on code in PR #14529:
URL: https://github.com/apache/kafka/pull/14529#discussion_r1356970370


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -0,0 +1,887 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.attribute.FileTime;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.FileRecords.LogOffsetPosition;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import static java.util.Arrays.asList;
+
+/**
+ * A segment of the log. Each segment has two components: a log and an index. 
The log is a FileRecords containing
+ * the actual messages. The index is an OffsetIndex that maps from logical 
offsets to physical file positions. Each
+ * segment has a base offset which is an offset <= the least offset of any 
message in this segment and > any offset in
+ * any previous segment.
+ *
+ * A segment with a base offset of [base_offset] would be stored in two files, 
a [base_offset].index and a [base_offset].log file.
+ *
+ * This class is not thread-safe.
+ */
+public class LogSegment {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogSegment.class);
+    private static final Timer LOG_FLUSH_TIMER;
+
+    static {
+        KafkaMetricsGroup logFlushStatsMetricsGroup = new 
KafkaMetricsGroup(LogSegment.class) {
+            @Override
+            public MetricName metricName(String name, Map<String, String> 
tags) {
+                // Override the group and type names for compatibility - this 
metrics group was previously defined within
+                // a Scala object named `kafka.log.LogFlushStats`
+                return KafkaMetricsGroup.explicitMetricName("kafka.log", 
"LogFlushStats", name, tags);
+            }
+        };
+        LOG_FLUSH_TIMER = 
logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+    }
+
+    private final FileRecords log;
+    private final LazyIndex<OffsetIndex> lazyOffsetIndex;
+    private final LazyIndex<TimeIndex> lazyTimeIndex;
+    private final TransactionIndex txnIndex;
+    private final long baseOffset;
+    private final int indexIntervalBytes;
+    private final long rollJitterMs;
+    private final Time time;
+
+    // The timestamp we used for time based log rolling and for ensuring max 
compaction delay
+    // volatile for LogCleaner to see the update
+    private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty();
+
+    /* The maximum timestamp and offset we see so far */
+    private volatile TimestampOffset maxTimestampAndOffsetSoFar = 
TimestampOffset.UNKNOWN;
+
+    private long created;
+
+    /* the number of bytes since we last added an entry in the offset index */
+    private int bytesSinceLastIndexEntry = 0;
+
+    /**
+     * Create a LogSegment with the provided parameters.
+     *
+     * @param log The file records containing log entries
+     * @param lazyOffsetIndex The offset index
+     * @param lazyTimeIndex The timestamp index
+     * @param txnIndex The transaction index
+     * @param baseOffset A lower bound on the offsets in this segment
+     * @param indexIntervalBytes The approximate number of bytes between 
entries in the index
+     * @param rollJitterMs The maximum random jitter subtracted from the 
scheduled segment roll time
+     * @param time The time instance
+     */
+    public LogSegment(FileRecords log,
+                      LazyIndex<OffsetIndex> lazyOffsetIndex,
+                      LazyIndex<TimeIndex> lazyTimeIndex,
+                      TransactionIndex txnIndex,
+                      long baseOffset,
+                      int indexIntervalBytes,
+                      long rollJitterMs,
+                      Time time) {
+        this.log = log;
+        this.lazyOffsetIndex = lazyOffsetIndex;
+        this.lazyTimeIndex = lazyTimeIndex;
+        this.txnIndex = txnIndex;
+        this.baseOffset = baseOffset;
+        this.indexIntervalBytes = indexIntervalBytes;
+        this.rollJitterMs = rollJitterMs;
+        this.time = time;
+        this.created = time.milliseconds();
+    }
+
+    // Visible for testing
+    public LogSegment(LogSegment segment) {
+        this(segment.log, segment.lazyOffsetIndex, segment.lazyTimeIndex, 
segment.txnIndex, segment.baseOffset,
+                segment.indexIntervalBytes, segment.rollJitterMs, 
segment.time);
+    }
+
+    public OffsetIndex offsetIndex() throws IOException {
+        return lazyOffsetIndex.get();
+    }
+
+    public File offsetIndexFile() {
+        return lazyOffsetIndex.file();
+    }
+
+    public TimeIndex timeIndex() throws IOException {
+        return lazyTimeIndex.get();
+    }
+
+    public File timeIndexFile() {
+        return lazyTimeIndex.file();
+    }
+
+    public long baseOffset() {
+        return baseOffset;
+    }
+
+    public FileRecords log() {
+        return log;
+    }
+
+    public long rollJitterMs() {
+        return rollJitterMs;
+    }
+
+    public TransactionIndex txnIndex() {
+        return txnIndex;
+    }
+
+    public boolean shouldRoll(RollParams rollParams) throws IOException {
+        boolean reachedRollMs = timeWaitedForRoll(rollParams.now, 
rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs;
+        int size = size();
+        return size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
+            (size > 0 && reachedRollMs) ||
+            offsetIndex().isFull() || timeIndex().isFull() || 
!canConvertToRelativeOffset(rollParams.maxOffsetInMessages);
+    }
+
+    public void resizeIndexes(int size) throws IOException {
+        offsetIndex().resize(size);
+        timeIndex().resize(size);
+    }
+
+    public void sanityCheck(boolean timeIndexFileNewlyCreated) throws 
IOException {
+        if (offsetIndexFile().exists()) {
+            // Resize the time index file to 0 if it is newly created.
+            if (timeIndexFileNewlyCreated)
+              timeIndex().resize(0);
+            // Sanity checks for time index and offset index are skipped 
because
+            // we will recover the segments above the recovery point in 
recoverLog()
+            // in any case so sanity checking them here is redundant.
+            txnIndex.sanityCheck();
+        } else
+            throw new NoSuchFileException("Offset index file " + 
offsetIndexFile().getAbsolutePath() + " does not exist");
+    }
+
+    /**
+     * The first time this is invoked, it will result in a time index lookup 
(including potential materialization of
+     * the time index).
+     */
+    public TimestampOffset readMaxTimestampAndOffsetSoFar() throws IOException 
{

Review Comment:
   I added a `read` prefix to this to make it clearer that it does more than 
than the field `maxTimestampAndOffsetSoFar`. The conversion from Scala to Java 
had originally caused some code to use the field instead of the method, which 
lead to some subtly different behavior in some cases.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -0,0 +1,887 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.attribute.FileTime;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.FileRecords.LogOffsetPosition;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import static java.util.Arrays.asList;
+
+/**
+ * A segment of the log. Each segment has two components: a log and an index. 
The log is a FileRecords containing
+ * the actual messages. The index is an OffsetIndex that maps from logical 
offsets to physical file positions. Each
+ * segment has a base offset which is an offset <= the least offset of any 
message in this segment and > any offset in
+ * any previous segment.
+ *
+ * A segment with a base offset of [base_offset] would be stored in two files, 
a [base_offset].index and a [base_offset].log file.
+ *
+ * This class is not thread-safe.
+ */
+public class LogSegment {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogSegment.class);
+    private static final Timer LOG_FLUSH_TIMER;
+
+    static {
+        KafkaMetricsGroup logFlushStatsMetricsGroup = new 
KafkaMetricsGroup(LogSegment.class) {
+            @Override
+            public MetricName metricName(String name, Map<String, String> 
tags) {
+                // Override the group and type names for compatibility - this 
metrics group was previously defined within
+                // a Scala object named `kafka.log.LogFlushStats`
+                return KafkaMetricsGroup.explicitMetricName("kafka.log", 
"LogFlushStats", name, tags);
+            }
+        };
+        LOG_FLUSH_TIMER = 
logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+    }
+
+    private final FileRecords log;
+    private final LazyIndex<OffsetIndex> lazyOffsetIndex;
+    private final LazyIndex<TimeIndex> lazyTimeIndex;
+    private final TransactionIndex txnIndex;
+    private final long baseOffset;
+    private final int indexIntervalBytes;
+    private final long rollJitterMs;
+    private final Time time;
+
+    // The timestamp we used for time based log rolling and for ensuring max 
compaction delay
+    // volatile for LogCleaner to see the update
+    private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty();
+
+    /* The maximum timestamp and offset we see so far */
+    private volatile TimestampOffset maxTimestampAndOffsetSoFar = 
TimestampOffset.UNKNOWN;
+
+    private long created;
+
+    /* the number of bytes since we last added an entry in the offset index */
+    private int bytesSinceLastIndexEntry = 0;
+
+    /**
+     * Create a LogSegment with the provided parameters.
+     *
+     * @param log The file records containing log entries
+     * @param lazyOffsetIndex The offset index
+     * @param lazyTimeIndex The timestamp index
+     * @param txnIndex The transaction index
+     * @param baseOffset A lower bound on the offsets in this segment
+     * @param indexIntervalBytes The approximate number of bytes between 
entries in the index
+     * @param rollJitterMs The maximum random jitter subtracted from the 
scheduled segment roll time
+     * @param time The time instance
+     */
+    public LogSegment(FileRecords log,
+                      LazyIndex<OffsetIndex> lazyOffsetIndex,
+                      LazyIndex<TimeIndex> lazyTimeIndex,
+                      TransactionIndex txnIndex,
+                      long baseOffset,
+                      int indexIntervalBytes,
+                      long rollJitterMs,
+                      Time time) {
+        this.log = log;
+        this.lazyOffsetIndex = lazyOffsetIndex;
+        this.lazyTimeIndex = lazyTimeIndex;
+        this.txnIndex = txnIndex;
+        this.baseOffset = baseOffset;
+        this.indexIntervalBytes = indexIntervalBytes;
+        this.rollJitterMs = rollJitterMs;
+        this.time = time;
+        this.created = time.milliseconds();
+    }
+
+    // Visible for testing
+    public LogSegment(LogSegment segment) {
+        this(segment.log, segment.lazyOffsetIndex, segment.lazyTimeIndex, 
segment.txnIndex, segment.baseOffset,
+                segment.indexIntervalBytes, segment.rollJitterMs, 
segment.time);
+    }
+
+    public OffsetIndex offsetIndex() throws IOException {
+        return lazyOffsetIndex.get();
+    }
+
+    public File offsetIndexFile() {
+        return lazyOffsetIndex.file();
+    }
+
+    public TimeIndex timeIndex() throws IOException {
+        return lazyTimeIndex.get();
+    }
+
+    public File timeIndexFile() {
+        return lazyTimeIndex.file();
+    }
+
+    public long baseOffset() {
+        return baseOffset;
+    }
+
+    public FileRecords log() {
+        return log;
+    }
+
+    public long rollJitterMs() {
+        return rollJitterMs;
+    }
+
+    public TransactionIndex txnIndex() {
+        return txnIndex;
+    }
+
+    public boolean shouldRoll(RollParams rollParams) throws IOException {
+        boolean reachedRollMs = timeWaitedForRoll(rollParams.now, 
rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs;
+        int size = size();
+        return size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
+            (size > 0 && reachedRollMs) ||
+            offsetIndex().isFull() || timeIndex().isFull() || 
!canConvertToRelativeOffset(rollParams.maxOffsetInMessages);
+    }
+
+    public void resizeIndexes(int size) throws IOException {
+        offsetIndex().resize(size);
+        timeIndex().resize(size);
+    }
+
+    public void sanityCheck(boolean timeIndexFileNewlyCreated) throws 
IOException {
+        if (offsetIndexFile().exists()) {
+            // Resize the time index file to 0 if it is newly created.
+            if (timeIndexFileNewlyCreated)
+              timeIndex().resize(0);
+            // Sanity checks for time index and offset index are skipped 
because
+            // we will recover the segments above the recovery point in 
recoverLog()
+            // in any case so sanity checking them here is redundant.
+            txnIndex.sanityCheck();
+        } else
+            throw new NoSuchFileException("Offset index file " + 
offsetIndexFile().getAbsolutePath() + " does not exist");
+    }
+
+    /**
+     * The first time this is invoked, it will result in a time index lookup 
(including potential materialization of
+     * the time index).
+     */
+    public TimestampOffset readMaxTimestampAndOffsetSoFar() throws IOException 
{

Review Comment:
   I added a `read` prefix to this to make it clearer that it does more than 
than the field `maxTimestampAndOffsetSoFar`. The conversion from Scala to Java 
had originally caused some code to use the field instead of the method, which 
led to some subtly different behavior in some cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to