[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-04-05 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1158148779


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -147,6 +152,16 @@ private List 
removeWhileMatching(Iterator

[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-04-05 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1158149051


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -147,6 +152,16 @@ private List 
removeWhileMatching(Iterator

[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-04-03 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1155878999


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory
+ */
+public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {

Review Comment:
   >  I was wondering if it would be possible to copy the 
LeaderEpochCheckpointCache into an intermediate data structure (which doesn't 
have to be of type LeaderEpochCheckpointCache, hence decoupling it from the 
need to create a dummy InMemoryLeaderEpochCheckpoint ) and then using that 
intermediate data structure to extract the required Map (after whatever 
manipulation we want to do with it).
   
   That's a good idea. https://issues.apache.org/jira/browse/KAFKA-14877 is 
created for this improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-04-03 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1155876324


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -383,21 +398,25 @@ public void clear() {
 
 // Visible for testing
 public List epochEntries() {
-lock.writeLock().lock();
+lock.readLock().lock();
 try {
 return new ArrayList<>(epochs.values());
 } finally {
-lock.writeLock().unlock();
+lock.readLock().unlock();
 }
 }
 
-private void flush() {
+private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint, 
Collection epochEntries) {
 lock.readLock().lock();
 try {
-checkpoint.write(epochs.values());
+leaderEpochCheckpoint.write(epochEntries);
 } finally {
 lock.readLock().unlock();
 }
 }
 
+private void flush() {
+flushTo(this.checkpoint, epochs.values());

Review Comment:
   For the "removing" lock for `flushTo`, good point, removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154310401


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -383,21 +398,25 @@ public void clear() {
 
 // Visible for testing
 public List epochEntries() {
-lock.writeLock().lock();
+lock.readLock().lock();
 try {
 return new ArrayList<>(epochs.values());
 } finally {
-lock.writeLock().unlock();
+lock.readLock().unlock();
 }
 }
 
-private void flush() {
+private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint, 
Collection epochEntries) {
 lock.readLock().lock();
 try {
-checkpoint.write(epochs.values());
+leaderEpochCheckpoint.write(epochEntries);
 } finally {
 lock.readLock().unlock();
 }
 }
 
+private void flush() {
+flushTo(this.checkpoint, epochs.values());

Review Comment:
   No, the 2nd parameter is different. For `in memory` one, we need to "clone" 
the epochs values to avoid change the inner entries. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154193013


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -383,11 +396,11 @@ public void clear() {
 
 // Visible for testing
 public List epochEntries() {
-lock.writeLock().lock();
+lock.readLock().lock();

Review Comment:
   Opened another PR https://github.com/apache/kafka/pull/13483 to fix this bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154178753


##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -219,6 +221,29 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
 }
   }
 
+  /**
+   * Returns the in memory leader epoch checkpoint by truncating with the 
given start[exclusive] and end[inclusive] offset
+   *
+   * @param log The actual log from where to take the leader-epoch 
checkpoint
+   * @param startOffset The start offset of the checkpoint file (exclusive in 
the truncation).
+   *If start offset is 6, then it will retain an entry at 
offset 6.
+   * @param endOffset   The end offset of the checkpoint file (inclusive in 
the truncation)
+   *If end offset is 100, then it will remove the entries 
greater than or equal to 100.
+   * @return the truncated leader epoch checkpoint
+   */
+  private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: 
Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = {

Review Comment:
   Will remove it to avoid confustion.



##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -219,6 +221,29 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
 }
   }
 
+  /**
+   * Returns the in memory leader epoch checkpoint by truncating with the 
given start[exclusive] and end[inclusive] offset
+   *
+   * @param log The actual log from where to take the leader-epoch 
checkpoint
+   * @param startOffset The start offset of the checkpoint file (exclusive in 
the truncation).
+   *If start offset is 6, then it will retain an entry at 
offset 6.
+   * @param endOffset   The end offset of the checkpoint file (inclusive in 
the truncation)
+   *If end offset is 100, then it will remove the entries 
greater than or equal to 100.
+   * @return the truncated leader epoch checkpoint
+   */
+  private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: 
Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = {

Review Comment:
   Will remove it to avoid confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154178160


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -148,6 +151,16 @@ private List 
removeWhileMatching(Iterator 
removeWhileMatching(Iterator

[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154172866


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory
+ */
+public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {

Review Comment:
   @divijvaidya, Sorry for not being clear and waste your time to try 
understanding this PR. My bad!
   
   The motivation for introducing `InMemoryLeaderEpochCheckpoint` is to allow 
remote log manager to create the `RemoteLogSegmentMetadata`(RLSM) with the 
correct leader epoch info for a specific segment. To do that, we need to rely 
on the `LeaderEpochCheckpointCache` to truncate from start and end, to get the 
epoch info. However, we don't really want to truncate the epochs in cache (and 
write to checkpoint file in the end). So, we introduce this 
`InMemoryLeaderEpochCheckpoint` to feed into `LeaderEpochCheckpointCache`, and 
when we truncate the epoch for RLSM, we can do them in memory without affecting 
the checkpoint file, and without interacting with file system. Does that make 
sense?
   
   I'll update in the PR description, and remove the methods that not used 
currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154172866


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory
+ */
+public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {

Review Comment:
   @divijvaidya, Sorry for not being clear and waste your time to try 
understanding this PR. My bad!
   
   The motivation for introducing `InMemoryLeaderEpochCheckpoint` is to allow 
remote log manager to create the `RemoteLogSegmentMetadata`(RLSM) with the 
correct leader epoch info for a specific segment. To do that, we need to rely 
on the `LeaderEpochCheckpointCache` to truncate from start and end, to get the 
epoch info. However, we don't really want to truncate the epochs in cache (and 
write to checkpoint file in the end). So, we introduce this 
`InMemoryLeaderEpochCheckpoint` to feed into `LeaderEpochCheckpointCache`, and 
when we truncate the epoch for RLSM, we can do them in memory without affecting 
the checkpoint file, and without interacting with file system. 
   
   I'll update in the PR description, and remove the methods that not used 
currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org