ijuma commented on code in PR #14529:
URL: https://github.com/apache/kafka/pull/14529#discussion_r1356883718


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.attribute.FileTime;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.FileRecords.LogOffsetPosition;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import static java.util.Arrays.asList;
+
+/**
+ * A segment of the log. Each segment has two components: a log and an index. 
The log is a FileRecords containing
+ * the actual messages. The index is an OffsetIndex that maps from logical 
offsets to physical file positions. Each
+ * segment has a base offset which is an offset <= the least offset of any 
message in this segment and > any offset in
+ * any previous segment.
+ *
+ * A segment with a base offset of [base_offset] would be stored in two files, 
a [base_offset].index and a [base_offset].log file.
+ *
+ * This class is not thread-safe.
+ */
+public class LogSegment {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogSegment.class);
+    private static final Timer LOG_FLUSH_TIMER;
+
+    static {
+        KafkaMetricsGroup logFlushStatsMetricsGroup = new 
KafkaMetricsGroup(LogSegment.class) {
+            @Override
+            public MetricName metricName(String name, Map<String, String> 
tags) {
+                // Override the group and type names for compatibility
+                return KafkaMetricsGroup.explicitMetricName("kafka.log", 
"LogFlushStats", name, tags);
+            }
+        };
+        LOG_FLUSH_TIMER = 
logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+    }
+
+    private final FileRecords log;
+    private final LazyIndex<OffsetIndex> lazyOffsetIndex;
+    private final LazyIndex<TimeIndex> lazyTimeIndex;
+    private final TransactionIndex txnIndex;
+    private final long baseOffset;
+    private final int indexIntervalBytes;
+    private final long rollJitterMs;
+    private final Time time;
+
+    // The timestamp we used for time based log rolling and for ensuring max 
compaction delay
+    // volatile for LogCleaner to see the update
+    private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty();
+
+    /* The maximum timestamp and offset we see so far */
+    private volatile TimestampOffset maxTimestampAndOffsetSoFar = 
TimestampOffset.UNKNOWN;
+
+    private long created;
+
+    /* the number of bytes since we last added an entry in the offset index */
+    private int bytesSinceLastIndexEntry = 0;
+
+    /**
+     * Create a LogSegment with the provided parameters.
+     *
+     * @param log The file records containing log entries
+     * @param lazyOffsetIndex The offset index
+     * @param lazyTimeIndex The timestamp index
+     * @param txnIndex The transaction index
+     * @param baseOffset A lower bound on the offsets in this segment
+     * @param indexIntervalBytes The approximate number of bytes between 
entries in the index
+     * @param rollJitterMs The maximum random jitter subtracted from the 
scheduled segment roll time
+     * @param time The time instance
+     */
+    public LogSegment(FileRecords log,
+                      LazyIndex<OffsetIndex> lazyOffsetIndex,
+                      LazyIndex<TimeIndex> lazyTimeIndex,
+                      TransactionIndex txnIndex,
+                      long baseOffset,
+                      int indexIntervalBytes,
+                      long rollJitterMs,
+                      Time time) {
+        this.log = log;
+        this.lazyOffsetIndex = lazyOffsetIndex;
+        this.lazyTimeIndex = lazyTimeIndex;
+        this.txnIndex = txnIndex;
+        this.baseOffset = baseOffset;
+        this.indexIntervalBytes = indexIntervalBytes;
+        this.rollJitterMs = rollJitterMs;
+        this.time = time;
+        this.created = time.milliseconds();
+    }
+
+    // Visible for testing
+    public LogSegment(LogSegment segment) {
+        this(segment.log, segment.lazyOffsetIndex, segment.lazyTimeIndex, 
segment.txnIndex, segment.baseOffset,
+                segment.indexIntervalBytes, segment.rollJitterMs, 
segment.time);
+    }
+
+    public OffsetIndex offsetIndex() throws IOException {
+        return lazyOffsetIndex.get();
+    }
+
+    public File offsetIndexFile() {
+        return lazyOffsetIndex.file();
+    }
+
+    public TimeIndex timeIndex() throws IOException {
+        return lazyTimeIndex.get();
+    }
+
+    public File timeIndexFile() {

Review Comment:
   It's a public because it needs to be a public method. What you suggested 
doesn't work because it forces materialization and it would be a serious 
regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to