[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1158148779 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -147,6 +152,16 @@ private List removeWhileMatching(Iterator
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1158149051 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -147,6 +152,16 @@ private List removeWhileMatching(Iterator
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1155878999 ## storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory + */ +public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { Review Comment: > I was wondering if it would be possible to copy the LeaderEpochCheckpointCache into an intermediate data structure (which doesn't have to be of type LeaderEpochCheckpointCache, hence decoupling it from the need to create a dummy InMemoryLeaderEpochCheckpoint ) and then using that intermediate data structure to extract the required Map (after whatever manipulation we want to do with it). That's a good idea. https://issues.apache.org/jira/browse/KAFKA-14877 is created for this improvement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1155876324 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -383,21 +398,25 @@ public void clear() { // Visible for testing public List epochEntries() { -lock.writeLock().lock(); +lock.readLock().lock(); try { return new ArrayList<>(epochs.values()); } finally { -lock.writeLock().unlock(); +lock.readLock().unlock(); } } -private void flush() { +private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint, Collection epochEntries) { lock.readLock().lock(); try { -checkpoint.write(epochs.values()); +leaderEpochCheckpoint.write(epochEntries); } finally { lock.readLock().unlock(); } } +private void flush() { +flushTo(this.checkpoint, epochs.values()); Review Comment: For the "removing" lock for `flushTo`, good point, removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154310401 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -383,21 +398,25 @@ public void clear() { // Visible for testing public List epochEntries() { -lock.writeLock().lock(); +lock.readLock().lock(); try { return new ArrayList<>(epochs.values()); } finally { -lock.writeLock().unlock(); +lock.readLock().unlock(); } } -private void flush() { +private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint, Collection epochEntries) { lock.readLock().lock(); try { -checkpoint.write(epochs.values()); +leaderEpochCheckpoint.write(epochEntries); } finally { lock.readLock().unlock(); } } +private void flush() { +flushTo(this.checkpoint, epochs.values()); Review Comment: No, the 2nd parameter is different. For `in memory` one, we need to "clone" the epochs values to avoid change the inner entries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154193013 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -383,11 +396,11 @@ public void clear() { // Visible for testing public List epochEntries() { -lock.writeLock().lock(); +lock.readLock().lock(); Review Comment: Opened another PR https://github.com/apache/kafka/pull/13483 to fix this bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154178753 ## core/src/main/scala/kafka/log/remote/RemoteLogManager.scala: ## @@ -219,6 +221,29 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig, } } + /** + * Returns the in memory leader epoch checkpoint by truncating with the given start[exclusive] and end[inclusive] offset + * + * @param log The actual log from where to take the leader-epoch checkpoint + * @param startOffset The start offset of the checkpoint file (exclusive in the truncation). + *If start offset is 6, then it will retain an entry at offset 6. + * @param endOffset The end offset of the checkpoint file (inclusive in the truncation) + *If end offset is 100, then it will remove the entries greater than or equal to 100. + * @return the truncated leader epoch checkpoint + */ + private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = { Review Comment: Will remove it to avoid confustion. ## core/src/main/scala/kafka/log/remote/RemoteLogManager.scala: ## @@ -219,6 +221,29 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig, } } + /** + * Returns the in memory leader epoch checkpoint by truncating with the given start[exclusive] and end[inclusive] offset + * + * @param log The actual log from where to take the leader-epoch checkpoint + * @param startOffset The start offset of the checkpoint file (exclusive in the truncation). + *If start offset is 6, then it will retain an entry at offset 6. + * @param endOffset The end offset of the checkpoint file (inclusive in the truncation) + *If end offset is 100, then it will remove the entries greater than or equal to 100. + * @return the truncated leader epoch checkpoint + */ + private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = { Review Comment: Will remove it to avoid confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154178160 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -148,6 +151,16 @@ private List removeWhileMatching(Iterator removeWhileMatching(Iterator
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154172866 ## storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory + */ +public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { Review Comment: @divijvaidya, Sorry for not being clear and waste your time to try understanding this PR. My bad! The motivation for introducing `InMemoryLeaderEpochCheckpoint` is to allow remote log manager to create the `RemoteLogSegmentMetadata`(RLSM) with the correct leader epoch info for a specific segment. To do that, we need to rely on the `LeaderEpochCheckpointCache` to truncate from start and end, to get the epoch info. However, we don't really want to truncate the epochs in cache (and write to checkpoint file in the end). So, we introduce this `InMemoryLeaderEpochCheckpoint` to feed into `LeaderEpochCheckpointCache`, and when we truncate the epoch for RLSM, we can do them in memory without affecting the checkpoint file, and without interacting with file system. Does that make sense? I'll update in the PR description, and remove the methods that not used currently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154172866 ## storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory + */ +public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { Review Comment: @divijvaidya, Sorry for not being clear and waste your time to try understanding this PR. My bad! The motivation for introducing `InMemoryLeaderEpochCheckpoint` is to allow remote log manager to create the `RemoteLogSegmentMetadata`(RLSM) with the correct leader epoch info for a specific segment. To do that, we need to rely on the `LeaderEpochCheckpointCache` to truncate from start and end, to get the epoch info. However, we don't really want to truncate the epochs in cache (and write to checkpoint file in the end). So, we introduce this `InMemoryLeaderEpochCheckpoint` to feed into `LeaderEpochCheckpointCache`, and when we truncate the epoch for RLSM, we can do them in memory without affecting the checkpoint file, and without interacting with file system. I'll update in the PR description, and remove the methods that not used currently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org