Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-06-04 Thread via GitHub


nizhikov commented on PR #14034:
URL: https://github.com/apache/kafka/pull/14034#issuecomment-2147235352

   Hello @satishd 
   
   Are you still working on this PR? 
   I can step in and finish the task if you have no time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-02-12 Thread via GitHub


satishd commented on PR #14034:
URL: https://github.com/apache/kafka/pull/14034#issuecomment-1938978627

   Resolved the recent conflicts because of trunk changes in `LocalLog` by 
rebasing them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-31 Thread via GitHub


satishd commented on PR #14034:
URL: https://github.com/apache/kafka/pull/14034#issuecomment-1918983502

   Resolved the recent conflicts because of trunk changes in `UnifiedLog`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-23 Thread via GitHub


satishd commented on PR #14034:
URL: https://github.com/apache/kafka/pull/14034#issuecomment-1906403295

   Thanks @ijuma for the review comments. Updated the PR with the inline 
comments and/or with the latest commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-22 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1461551624


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -0,0 +1,1115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.Scheduler;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe.
+ */
+public class LocalLog {
+
+/**
+ * A file that is scheduled to be deleted
+ */
+public static final String DELETED_FILE_SUFFIX = 
LogFileUtils.DELETED_FILE_SUFFIX;
+
+/**
+ * A temporary file that is being used for log cleaning
+ */
+public static final String CLEANED_FILE_SUFFIX = ".cleaned";
+
+/**
+ * A temporary file used when swapping files into the log
+ */
+public static final String SWAP_FILE_SUFFIX = ".swap";
+
+/**
+ * A directory that is scheduled to be deleted
+ */
+public static final String DELETE_DIR_SUFFIX = "-delete";
+
+/**
+ * A directory that is used for future partition
+ */
+public static final String FUTURE_DIR_SUFFIX = "-future";
+public static final String STRAY_DIR_SUFFIX = "-stray";
+
+public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + DELETE_DIR_SUFFIX);
+public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
+public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + STRAY_DIR_SUFFIX);
+
+public static final long UNKNOWN_OFFSET = -1L;
+
+private final Logger logger;
+
+private final LogSegments segments;
+private final Scheduler scheduler;
+private final Time time;
+private final TopicPartition topicPartition;
+private final LogDirFailureChannel logDirFailureChannel;
+
+// Last time the log was flushed
+private final AtomicLong lastFlushedTime;
+
+private volatile File dir;
+private volatile LogConfig config;
+private volatile long recoveryPoint;
+private volatile LogOffsetMetadata nextOffsetMetadata;
+
+// Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+private volatile String parentDir;
+// The memory mapped buffer for index files of this log will be closed 
with either delete() or closeHandlers()
+// After memory mapped buffer is closed, no disk IO operation should be 
performed for this log.
+private volatile boolean isMemoryMappedBufferClosed = false;
+
+/**
+ * Creates a new LocalLog instance.
+ *
+ * @param dir   

Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-22 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1461537909


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -0,0 +1,1115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.Scheduler;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe.
+ */
+public class LocalLog {
+
+/**
+ * A file that is scheduled to be deleted
+ */
+public static final String DELETED_FILE_SUFFIX = 
LogFileUtils.DELETED_FILE_SUFFIX;
+
+/**
+ * A temporary file that is being used for log cleaning
+ */
+public static final String CLEANED_FILE_SUFFIX = ".cleaned";
+
+/**
+ * A temporary file used when swapping files into the log
+ */
+public static final String SWAP_FILE_SUFFIX = ".swap";
+
+/**
+ * A directory that is scheduled to be deleted
+ */
+public static final String DELETE_DIR_SUFFIX = "-delete";
+
+/**
+ * A directory that is used for future partition
+ */
+public static final String FUTURE_DIR_SUFFIX = "-future";
+public static final String STRAY_DIR_SUFFIX = "-stray";
+
+public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + DELETE_DIR_SUFFIX);
+public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
+public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + STRAY_DIR_SUFFIX);
+
+public static final long UNKNOWN_OFFSET = -1L;
+
+private final Logger logger;
+
+private final LogSegments segments;
+private final Scheduler scheduler;
+private final Time time;
+private final TopicPartition topicPartition;
+private final LogDirFailureChannel logDirFailureChannel;
+
+// Last time the log was flushed
+private final AtomicLong lastFlushedTime;
+
+private volatile File dir;
+private volatile LogConfig config;
+private volatile long recoveryPoint;
+private volatile LogOffsetMetadata nextOffsetMetadata;
+
+// Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+private volatile String parentDir;
+// The memory mapped buffer for index files of this log will be closed 
with either delete() or closeHandlers()
+// After memory mapped buffer is closed, no disk IO operation should be 
performed for this log.
+private volatile boolean isMemoryMappedBufferClosed = false;
+
+/**
+ * Creates a new LocalLog instance.
+ *
+ * @param dir   

Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-22 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1461537015


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -0,0 +1,1115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.Scheduler;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe.
+ */
+public class LocalLog {
+
+/**
+ * A file that is scheduled to be deleted
+ */
+public static final String DELETED_FILE_SUFFIX = 
LogFileUtils.DELETED_FILE_SUFFIX;
+
+/**
+ * A temporary file that is being used for log cleaning
+ */
+public static final String CLEANED_FILE_SUFFIX = ".cleaned";
+
+/**
+ * A temporary file used when swapping files into the log
+ */
+public static final String SWAP_FILE_SUFFIX = ".swap";
+
+/**
+ * A directory that is scheduled to be deleted
+ */
+public static final String DELETE_DIR_SUFFIX = "-delete";
+
+/**
+ * A directory that is used for future partition
+ */
+public static final String FUTURE_DIR_SUFFIX = "-future";
+public static final String STRAY_DIR_SUFFIX = "-stray";
+
+public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + DELETE_DIR_SUFFIX);
+public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
+public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + STRAY_DIR_SUFFIX);
+
+public static final long UNKNOWN_OFFSET = -1L;
+
+private final Logger logger;
+
+private final LogSegments segments;
+private final Scheduler scheduler;
+private final Time time;
+private final TopicPartition topicPartition;
+private final LogDirFailureChannel logDirFailureChannel;
+
+// Last time the log was flushed
+private final AtomicLong lastFlushedTime;
+
+private volatile File dir;
+private volatile LogConfig config;
+private volatile long recoveryPoint;
+private volatile LogOffsetMetadata nextOffsetMetadata;
+
+// Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+private volatile String parentDir;
+// The memory mapped buffer for index files of this log will be closed 
with either delete() or closeHandlers()
+// After memory mapped buffer is closed, no disk IO operation should be 
performed for this log.
+private volatile boolean isMemoryMappedBufferClosed = false;
+
+/**
+ * Creates a new LocalLog instance.
+ *
+ * @param dir   

Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-22 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1461535963


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -0,0 +1,1115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.Scheduler;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe.
+ */
+public class LocalLog {
+
+/**
+ * A file that is scheduled to be deleted
+ */
+public static final String DELETED_FILE_SUFFIX = 
LogFileUtils.DELETED_FILE_SUFFIX;
+
+/**
+ * A temporary file that is being used for log cleaning
+ */
+public static final String CLEANED_FILE_SUFFIX = ".cleaned";
+
+/**
+ * A temporary file used when swapping files into the log
+ */
+public static final String SWAP_FILE_SUFFIX = ".swap";
+
+/**
+ * A directory that is scheduled to be deleted
+ */
+public static final String DELETE_DIR_SUFFIX = "-delete";
+
+/**
+ * A directory that is used for future partition
+ */
+public static final String FUTURE_DIR_SUFFIX = "-future";
+public static final String STRAY_DIR_SUFFIX = "-stray";
+
+public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + DELETE_DIR_SUFFIX);
+public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
+public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + STRAY_DIR_SUFFIX);
+
+public static final long UNKNOWN_OFFSET = -1L;
+
+private final Logger logger;
+
+private final LogSegments segments;
+private final Scheduler scheduler;
+private final Time time;
+private final TopicPartition topicPartition;
+private final LogDirFailureChannel logDirFailureChannel;
+
+// Last time the log was flushed
+private final AtomicLong lastFlushedTime;
+
+private volatile File dir;
+private volatile LogConfig config;
+private volatile long recoveryPoint;
+private volatile LogOffsetMetadata nextOffsetMetadata;
+
+// Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+private volatile String parentDir;
+// The memory mapped buffer for index files of this log will be closed 
with either delete() or closeHandlers()
+// After memory mapped buffer is closed, no disk IO operation should be 
performed for this log.
+private volatile boolean isMemoryMappedBufferClosed = false;
+
+/**
+ * Creates a new LocalLog instance.
+ *
+ * @param dir   

Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-22 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1461536622


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -0,0 +1,1115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.Scheduler;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe.
+ */
+public class LocalLog {
+
+/**
+ * A file that is scheduled to be deleted
+ */
+public static final String DELETED_FILE_SUFFIX = 
LogFileUtils.DELETED_FILE_SUFFIX;
+
+/**
+ * A temporary file that is being used for log cleaning
+ */
+public static final String CLEANED_FILE_SUFFIX = ".cleaned";
+
+/**
+ * A temporary file used when swapping files into the log
+ */
+public static final String SWAP_FILE_SUFFIX = ".swap";
+
+/**
+ * A directory that is scheduled to be deleted
+ */
+public static final String DELETE_DIR_SUFFIX = "-delete";
+
+/**
+ * A directory that is used for future partition
+ */
+public static final String FUTURE_DIR_SUFFIX = "-future";
+public static final String STRAY_DIR_SUFFIX = "-stray";
+
+public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + DELETE_DIR_SUFFIX);
+public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
+public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + STRAY_DIR_SUFFIX);
+
+public static final long UNKNOWN_OFFSET = -1L;
+
+private final Logger logger;
+
+private final LogSegments segments;
+private final Scheduler scheduler;
+private final Time time;
+private final TopicPartition topicPartition;
+private final LogDirFailureChannel logDirFailureChannel;
+
+// Last time the log was flushed
+private final AtomicLong lastFlushedTime;
+
+private volatile File dir;
+private volatile LogConfig config;
+private volatile long recoveryPoint;
+private volatile LogOffsetMetadata nextOffsetMetadata;
+
+// Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+private volatile String parentDir;
+// The memory mapped buffer for index files of this log will be closed 
with either delete() or closeHandlers()
+// After memory mapped buffer is closed, no disk IO operation should be 
performed for this log.
+private volatile boolean isMemoryMappedBufferClosed = false;
+
+/**
+ * Creates a new LocalLog instance.
+ *
+ * @param dir   

Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-22 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1461534346


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogTruncation.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+public class LogTruncation implements SegmentDeletionReason {
+
+private final Logger logger;
+
+public LogTruncation(Logger logger) {
+this.logger = logger;
+}
+
+@Override
+public void logReason(List toDelete) {
+logger.info("Deleting segments as part of log truncation: {}", 
Utils.join(toDelete, ", "));

Review Comment:
   It is done deliberately as it is better to read with that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-22 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1461532356


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -0,0 +1,1115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.Scheduler;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe.
+ */
+public class LocalLog {
+
+/**
+ * A file that is scheduled to be deleted
+ */
+public static final String DELETED_FILE_SUFFIX = 
LogFileUtils.DELETED_FILE_SUFFIX;
+
+/**
+ * A temporary file that is being used for log cleaning
+ */
+public static final String CLEANED_FILE_SUFFIX = ".cleaned";
+
+/**
+ * A temporary file used when swapping files into the log
+ */
+public static final String SWAP_FILE_SUFFIX = ".swap";
+
+/**
+ * A directory that is scheduled to be deleted
+ */
+public static final String DELETE_DIR_SUFFIX = "-delete";
+
+/**
+ * A directory that is used for future partition
+ */
+public static final String FUTURE_DIR_SUFFIX = "-future";
+public static final String STRAY_DIR_SUFFIX = "-stray";
+
+public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + DELETE_DIR_SUFFIX);
+public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
+public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + STRAY_DIR_SUFFIX);
+
+public static final long UNKNOWN_OFFSET = -1L;
+
+private final Logger logger;
+
+private final LogSegments segments;
+private final Scheduler scheduler;
+private final Time time;
+private final TopicPartition topicPartition;
+private final LogDirFailureChannel logDirFailureChannel;
+
+// Last time the log was flushed
+private final AtomicLong lastFlushedTime;
+
+private volatile File dir;
+private volatile LogConfig config;
+private volatile long recoveryPoint;
+private volatile LogOffsetMetadata nextOffsetMetadata;
+
+// Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+private volatile String parentDir;
+// The memory mapped buffer for index files of this log will be closed 
with either delete() or closeHandlers()
+// After memory mapped buffer is closed, no disk IO operation should be 
performed for this log.
+private volatile boolean isMemoryMappedBufferClosed = false;
+
+/**
+ * Creates a new LocalLog instance.
+ *
+ * @param dir   

Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-12 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1451284748


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -1245,8 +1246,8 @@ private[log] class CleanedTransactionMetadata {
*
* @param abortedTransactions The new found aborted transactions to add
*/
-  def addAbortedTransactions(abortedTransactions: List[AbortedTxn]): Unit = {
-this.abortedTransactions ++= abortedTransactions
+  def addAbortedTransactions(abortedTransactions: util.List[AbortedTxn]): Unit 
= {
+abortedTransactions.forEach(abortedTxn => this.abortedTransactions += 
abortedTxn)

Review Comment:
   Wanted to avoid scala conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-12 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1451284647


##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -3951,18 +3953,18 @@ class UnifiedLogTest {
 assertEquals(10, log.logSegments.size())
 
 {
-  val deletable = log.deletableSegments(
-(segment: LogSegment, _: Option[LogSegment]) => segment.baseOffset <= 
5)
-  val expected = log.nonActiveLogSegmentsFrom(0L).asScala.filter(segment 
=> segment.baseOffset <= 5).toList
-  assertEquals(6, expected.length)
-  assertEquals(expected, deletable.toList)
+  val deletable = new util.ArrayList(

Review Comment:
   This is strictly not needed as it returns `ArrayList` in this test's 
successful case scenario but the same method can return `EmptyList` instance 
also in other case. I wanted the test to fail with details about the elements 
instead of type mismatch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-12 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1451284145


##
core/src/main/scala/kafka/log/LogLoader.scala:
##
@@ -489,16 +488,16 @@ class LogLoader(
*
* @param segmentsToDelete The log segments to schedule for deletion
*/
-  private def removeAndDeleteSegmentsAsync(segmentsToDelete: 
Iterable[LogSegment]): Unit = {
-if (segmentsToDelete.nonEmpty) {
+  private def removeAndDeleteSegmentsAsync(segmentsToDelete: 
java.util.Collection[LogSegment]): Unit = {
+if (!segmentsToDelete.isEmpty) {
   // Most callers hold an iterator into the `params.segments` collection 
and
   // `removeAndDeleteSegmentAsync` mutates it by removing the deleted 
segment. Therefore,
   // we should force materialization of the iterator here, so that results 
of the iteration
   // remain valid and deterministic. We should also pass only the 
materialized view of the
   // iterator to the logic that deletes the segments.
-  val toDelete = segmentsToDelete.toList
-  info(s"Deleting segments as part of log recovery: 
${toDelete.mkString(",")}")
-  toDelete.foreach { segment =>
+  val toDelete = new util.ArrayList[LogSegment](segmentsToDelete)
+  info(s"Deleting segments as part of log recovery: 
${LocalLog.mkString(toDelete.iterator(), ",")}")

Review Comment:
   Thanks for letting me know about `Utils.join`, I was not aware of it.



##
core/src/test/scala/unit/kafka/log/LocalLogTest.scala:
##
@@ -362,8 +363,8 @@ class LocalLogTest {
 }
 assertEquals(5, log.segments.numberOfSegments)
 assertNotEquals(10L, log.segments.activeSegment.baseOffset)
-val expected = log.segments.values.asScala.toVector
-val deleted = log.truncateFullyAndStartAt(10L)
+val expected = new util.ArrayList(log.segments.values)
+val deleted = new util.ArrayList(log.truncateFullyAndStartAt(10L))

Review Comment:
   They give different types like java `ConcurrentSkipListMap.Values` and 
`ArrayList`, and the equality check will fail. Wrapped around the same 
ArrayList instance for their equality checks. 
   Followed similar checks in LocalLogTest and UnifiedLogTest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-11 Thread via GitHub


ijuma commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1448871794


##
core/src/main/scala/kafka/log/LogLoader.scala:
##
@@ -489,16 +488,16 @@ class LogLoader(
*
* @param segmentsToDelete The log segments to schedule for deletion
*/
-  private def removeAndDeleteSegmentsAsync(segmentsToDelete: 
Iterable[LogSegment]): Unit = {
-if (segmentsToDelete.nonEmpty) {
+  private def removeAndDeleteSegmentsAsync(segmentsToDelete: 
java.util.Collection[LogSegment]): Unit = {
+if (!segmentsToDelete.isEmpty) {
   // Most callers hold an iterator into the `params.segments` collection 
and
   // `removeAndDeleteSegmentAsync` mutates it by removing the deleted 
segment. Therefore,
   // we should force materialization of the iterator here, so that results 
of the iteration
   // remain valid and deterministic. We should also pass only the 
materialized view of the
   // iterator to the logic that deletes the segments.
-  val toDelete = segmentsToDelete.toList
-  info(s"Deleting segments as part of log recovery: 
${toDelete.mkString(",")}")
-  toDelete.foreach { segment =>
+  val toDelete = new util.ArrayList[LogSegment](segmentsToDelete)
+  info(s"Deleting segments as part of log recovery: 
${LocalLog.mkString(toDelete.iterator(), ",")}")

Review Comment:
   Can you not use `Utils.join` instead of creating a new implementation here?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2246,30 +2251,31 @@ object UnifiedLog extends Logging {
   config: LogConfig,
   scheduler: Scheduler,
   logDirFailureChannel: 
LogDirFailureChannel,
-  logPrefix: String): 
SplitSegmentResult = {
-LocalLog.splitOverflowedSegment(segment, existingSegments, dir, 
topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
+  logger: Logger): SplitSegmentResult 
= {
+LocalLog.splitOverflowedSegment(segment, existingSegments, dir, 
topicPartition, config, scheduler, logDirFailureChannel, logger)
   }
 
-  private[log] def deleteProducerSnapshots(segments: Iterable[LogSegment],
+  private[log] def deleteProducerSnapshots(segments: 
util.Collection[LogSegment],
producerStateManager: 
ProducerStateManager,
asyncDelete: Boolean,
scheduler: Scheduler,
config: LogConfig,
logDirFailureChannel: 
LogDirFailureChannel,
parentDir: String,
topicPartition: TopicPartition): 
Unit = {
-val snapshotsToDelete = segments.flatMap { segment =>
-  
producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset).asScala
-}
+val snapshotsToDelete = segments.stream().flatMap { segment =>
+  val snapshotOptional = 
producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset)
+  if (snapshotOptional.isPresent) 
util.stream.Stream.of[SnapshotFile](snapshotOptional.get) else 
util.stream.Stream.empty[SnapshotFile]
+}.collect(Collectors.toList[SnapshotFile])
 
 def deleteProducerSnapshots(): Unit = {
-  LocalLog.maybeHandleIOException(logDirFailureChannel,
-parentDir,
-s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir") {
-snapshotsToDelete.foreach { snapshot =>
-  snapshot.deleteIfExists()
-}
-  }
+  LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir,
+() => s"Error while deleting producer state snapshots for 
$topicPartition in dir $parentDir",
+() => {

Review Comment:
   I don't think you need the block since there is only one expression inside 
it.



##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -3971,10 +3973,10 @@ class UnifiedLogTest {
   ))
   log.appendAsLeader(records, leaderEpoch = 0)
   log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
-  val deletable = log.deletableSegments((_: LogSegment, _: 
Option[LogSegment]) => true)
-  val expected = log.logSegments.asScala.toList
-  assertEquals(10, expected.length)
-  assertEquals(expected, deletable.toList)
+  val deletable = new util.ArrayList(log.deletableSegments((_: LogSegment, 
_: Optional[LogSegment]) => true))
+  val expected = new util.ArrayList(log.logSegments)

Review Comment:
   Are these copies needed?



##
core/src/test/scala/unit/kafka/log/LocalLogTest.scala:
##
@@ -362,8 +363,8 @@ class LocalLogTest {
 }
 assertEquals(5, log.segments.numberOfSegments)
 

Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1442403275


##
gradle/spotbugs-exclude.xml:
##
@@ -162,8 +162,7 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
  on the compiler to fail if the code is changed to call a method 
that throws Exception.
  Given that, this bug pattern doesn't make sense for Scala code. 
-->
 
-
-
+

Review Comment:
   Right, it is not needed, made the respective changes in the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1440213705


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging {
 def deleteProducerSnapshots(): Unit = {
   LocalLog.maybeHandleIOException(logDirFailureChannel,
 parentDir,
-s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir") {
+s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir", {
 snapshotsToDelete.foreach { snapshot =>
   snapshot.deleteIfExists()
 }
-  }
+  return;

Review Comment:
   It was expecting a return for inline declaration, it is throwing a type 
mismatch error without that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402745


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def collectAbortedTransactions(startOffset: Long, 
upperBoundOffset: Long): List[AbortedTxn] = {

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.
   



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def collectAbortedTransactions(startOffset: Long, 
upperBoundOffset: Long): List[AbortedTxn] = {

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.
   



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging {
 def deleteProducerSnapshots(): Unit = {
   LocalLog.maybeHandleIOException(logDirFailureChannel,
 parentDir,
-s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir") {
+s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir", {
 snapshotsToDelete.foreach { snapshot =>
   snapshot.deleteIfExists()
 }
-  }
+  return;

Review Comment:
   It was expecting a return for inline declaration, it is throwing a type 
mismatch error without that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402573


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*  (if there is one). It returns true iff the segment is 
deletable.
* @return the segments ready to be deleted
*/
-  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
-def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], 
upperBoundOffset: Long): Boolean = {
-  val allowDeletionDueToLogStartOffsetIncremented = 
nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset
+  private[log] def deletableSegments(predicate: (LogSegment, 
Optional[LogSegment]) => Boolean): Iterable[LogSegment] = {

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*  (if there is one). It returns true iff the segment is 
deletable.
* @return the segments ready to be deleted
*/
-  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
-def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], 
upperBoundOffset: Long): Boolean = {
-  val allowDeletionDueToLogStartOffsetIncremented = 
nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset
+  private[log] def deletableSegments(predicate: (LogSegment, 
Optional[LogSegment]) => Boolean): Iterable[LogSegment] = {

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402667


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
 localLog.checkIfMemoryMappedBufferClosed()
 // remove the segments for lookups
-localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
+localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava,  
true, reason)

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
 localLog.checkIfMemoryMappedBufferClosed()
 // remove the segments for lookups
-localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
+localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava,  
true, reason)

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402484


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -0,0 +1,1146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.Scheduler;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe.
+ */
+public class LocalLog {
+
+/**
+ * a file that is scheduled to be deleted
+ */
+public static final String DELETED_FILE_SUFFIX = 
LogFileUtils.DELETED_FILE_SUFFIX;
+
+/**
+ * A temporary file that is being used for log cleaning
+ */
+public static final String CLEANED_FILE_SUFFIX = ".cleaned";
+
+/**
+ * A temporary file used when swapping files into the log
+ */
+public static final String SWAP_FILE_SUFFIX = ".swap";
+
+/**
+ * a directory that is scheduled to be deleted
+ */
+public static final String DELETE_DIR_SUFFIX = "-delete";
+
+/**
+ * a directory that is used for future partition
+ */
+public static final String FUTURE_DIR_SUFFIX = "-future";
+public static final String STRAY_DIR_SUFFIX = "-stray";
+
+public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + DELETE_DIR_SUFFIX);
+public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
+public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + STRAY_DIR_SUFFIX);
+
+public static final long UNKNOWN_OFFSET = -1L;
+
+private final Logger logger;
+
+private final LogSegments segments;
+private final Scheduler scheduler;
+private final Time time;
+private final TopicPartition topicPartition;
+private final LogDirFailureChannel logDirFailureChannel;
+
+// Last time the log was flushed
+private final AtomicLong lastFlushedTime;
+
+private volatile File dir;
+private volatile LogConfig config;
+private volatile long recoveryPoint;
+private volatile LogOffsetMetadata nextOffsetMetadata;
+
+// Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+private volatile String parentDir;
+// The memory mapped buffer for index files of this log will be closed 
with either delete() or closeHandlers()
+// After memory mapped buffer is closed, no disk IO operation should be 
performed for this log.
+private volatile boolean isMemoryMappedBufferClosed = false;
+
+/**
+ * Creates a new LocalLog instance.
+  

Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402411


##
checkstyle/import-control-core.xml:
##
@@ -37,6 +37,8 @@
   
   
   
+  
+  

Review Comment:
   Right, it is not needed. I guess it was needed when that class was kept 
inside the core module.



##
checkstyle/import-control-core.xml:
##
@@ -37,6 +37,8 @@
   
   
   
+  
+  

Review Comment:
   Right, it is not needed. I guess it was needed when that class was kept 
inside the core module. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-03 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1440285636


##
storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.List;
+
+public interface SegmentDeletionReason {

Review Comment:
   `SegmentDeletionReason` has other implementations like `RetentionMsBreach`, 
`RetentionSizeBreach`, `StartOffsetBreach` which use `UnifiedLog`. We can take 
a relook at this when we refactor `UnifiedLog`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-03 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1440207191


##
storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.List;
+
+public interface SegmentDeletionReason {

Review Comment:
   SegmentDeletionReason has other implementations like `RetentionMsBreach`, 
`RetentionSizeBreach`, `StartOffsetBreach` which use UnifiedLog. We can take a 
relook at this when we refactor UnifiedLog. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-03 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1440207191


##
storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.List;
+
+public interface SegmentDeletionReason {

Review Comment:
   SegmentDeletionReason has other implementations like `RetentionMsBreach`, 
`RetentionSizeBreach`, `StartOffsetBreach` which use UnifiedLog. We can take a 
relook at this when we refactor UnifiedLog. 



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*  (if there is one). It returns true iff the segment is 
deletable.
* @return the segments ready to be deleted
*/
-  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
-def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], 
upperBoundOffset: Long): Boolean = {
-  val allowDeletionDueToLogStartOffsetIncremented = 
nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset
+  private[log] def deletableSegments(predicate: (LogSegment, 
Optional[LogSegment]) => Boolean): Iterable[LogSegment] = {

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
 localLog.checkIfMemoryMappedBufferClosed()
 // remove the segments for lookups
-localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
+localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava,  
true, reason)

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.



##
checkstyle/import-control-core.xml:
##
@@ -37,6 +37,8 @@
   
   
   
+  
+  

Review Comment:
   Right, it is not needed. I guess it was needed when that class was kept 
inside the core module. 



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def collectAbortedTransactions(startOffset: Long, 
upperBoundOffset: Long): List[AbortedTxn] = {

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.
   



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging {
 def deleteProducerSnapshots(): Unit = {
   LocalLog.maybeHandleIOException(logDirFailureChannel,
 parentDir,
-s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir") {
+s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir", {
 snapshotsToDelete.foreach { snapshot =>
   snapshot.deleteIfExists()
 }
-  }
+  return;

Review Comment:
   It was expecting a return for inline declaration, it is throwing a type 
mismatch error without that. 



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -0,0 +1,1146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the 

Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-01 Thread via GitHub


ijuma commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1439152043


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*  (if there is one). It returns true iff the segment is 
deletable.
* @return the segments ready to be deleted
*/
-  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
-def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], 
upperBoundOffset: Long): Boolean = {
-  val allowDeletionDueToLogStartOffsetIncremented = 
nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset
+  private[log] def deletableSegments(predicate: (LogSegment, 
Optional[LogSegment]) => Boolean): Iterable[LogSegment] = {

Review Comment:
   We should change this to return `java.util.Collection[LogSegment]` to avoid 
unnecessary conversions.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
 localLog.checkIfMemoryMappedBufferClosed()
 // remove the segments for lookups
-localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
+localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava,  
true, reason)

Review Comment:
   This conversion can be avoided if we make the change to the method signature 
suggested above.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1564,8 +1565,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
-def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  nextSegmentOpt.exists(_.baseOffset <= (if (remoteLogEnabled()) 
localLogStartOffset() else logStartOffset))
+def shouldDelete(segment: LogSegment, nextSegmentOpt: 
Optional[LogSegment]): Boolean = {
+  if (nextSegmentOpt.isPresent)
+nextSegmentOpt.get().baseOffset <= (if (remoteLogEnabled()) 
localLogStartOffset() else logStartOffset)
+  else false

Review Comment:
   Nit: you can do `nextSegmentOpt.isPresent && nextSegmentOpt.get...`



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def collectAbortedTransactions(startOffset: Long, 
upperBoundOffset: Long): List[AbortedTxn] = {

Review Comment:
   We should change this to return `java.util.Collection` or `java.util.List` 
to avoid unnecessary conversions.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging {
 def deleteProducerSnapshots(): Unit = {
   LocalLog.maybeHandleIOException(logDirFailureChannel,
 parentDir,
-s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir") {
+s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir", {
 snapshotsToDelete.foreach { snapshot =>
   snapshot.deleteIfExists()
 }
-  }
+  return;

Review Comment:
   Hmm, it is a bit odd that a `return` with no value is required for scala 
code. Is this right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-01 Thread via GitHub


ijuma commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1439091103


##
core/src/test/scala/unit/kafka/log/LocalLogTest.scala:
##
@@ -362,8 +364,8 @@ class LocalLogTest {
 }
 assertEquals(5, log.segments.numberOfSegments)
 assertNotEquals(10L, log.segments.activeSegment.baseOffset)
-val expected = log.segments.values.asScala.toVector
-val deleted = log.truncateFullyAndStartAt(10L)
+val expected = new util.ArrayList(log.segments.values)
+val deleted = 
StreamSupport.stream(log.truncateFullyAndStartAt(10L).spliterator(), 
false).collect(Collectors.toList())

Review Comment:
   This can be simplified if `truncateFullyAndStartAt` returns `Collection` 
instead of `Iterable`.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -0,0 +1,1146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.Scheduler;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe.
+ */
+public class LocalLog {
+
+/**
+ * a file that is scheduled to be deleted
+ */
+public static final String DELETED_FILE_SUFFIX = 
LogFileUtils.DELETED_FILE_SUFFIX;
+
+/**
+ * A temporary file that is being used for log cleaning
+ */
+public static final String CLEANED_FILE_SUFFIX = ".cleaned";
+
+/**
+ * A temporary file used when swapping files into the log
+ */
+public static final String SWAP_FILE_SUFFIX = ".swap";
+
+/**
+ * a directory that is scheduled to be deleted
+ */
+public static final String DELETE_DIR_SUFFIX = "-delete";
+
+/**
+ * a directory that is used for future partition
+ */
+public static final String FUTURE_DIR_SUFFIX = "-future";
+public static final String STRAY_DIR_SUFFIX = "-stray";
+
+public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + DELETE_DIR_SUFFIX);
+public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
+public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + STRAY_DIR_SUFFIX);
+
+public static final long UNKNOWN_OFFSET = -1L;
+
+private final Logger logger;
+
+private final LogSegments segments;
+private final Scheduler scheduler;
+private final Time time;
+private final TopicPartition topicPartition;
+private final LogDirFailureChannel logDirFailureChannel;
+
+// Last time the log was flushed
+private final AtomicLong lastFlushedTime;
+
+private 

Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2023-11-18 Thread via GitHub


ijuma commented on PR #14034:
URL: https://github.com/apache/kafka/pull/14034#issuecomment-1817663128

   @satishd Let me know when this is ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2023-10-12 Thread via GitHub


ijuma commented on PR #14034:
URL: https://github.com/apache/kafka/pull/14034#issuecomment-1759964557

   @satishd Since we now have #14529, do you think you'll be able to pick this 
up again soon?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org