[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782275#comment-17782275 ] Divij Vaidya commented on KAFKA-15609: -- Ah! Thank you [~ocadaruma] . Thank you [~adupriez] . This fills a critical gap in my knowledge and I am grateful for the references that you have shared. I guess it time for me to correct my understanding of Linux kernel. To conclude: Mmaped and page cache for the same file shares the same physical memory. Hence, a change to Mmap is visible to File I/O over page cache and vice versa (assuming shared mode and non-direct read/writes). > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782229#comment-17782229 ] Alexandre Dupriez commented on KAFKA-15609: --- The nature - private or shared - of a memory mapping have visibility implications between processes, but from within the same process consistency should always be guaranteed. "Flushing" a memory-mapped file to the block device can be initiated with the {{msync}} syscall but that operation is not necessary for the visibility guarantees which are questioned in this ticket. A succinct description of memory mapping and can be found in {_}Understanding the Linux Kernel, Third Edition{_}, edition O'Reilly, page 657-668. > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1778#comment-1778 ] Haruki Okada commented on KAFKA-15609: -- I added [MmapTest3.java|https://gist.github.com/ocadaruma/fc26fc122829c63cb61e14d7fc96896d] and confirmed the reads by read() and writes via mmap are consistent. > Would you happen to have some reference that I can read about this? I failed to find good web reference but the book "The Linux Programming Interface 2nd edition" mentions about this. excerpt: {quote}Like many other modern UNIX implementations, Linux provides a so-called unified virtual memory system. This means that, where possible, memory mappings and blocks of the buffer cache share the same pages of physical memory. Thus, the views of a file obtained via a mapping and via I/O system calls (read(), write(), and so on) are always consistent, and the only use of msync() is to force the contents of a mapped region to be flushed to disk.{quote} > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782210#comment-17782210 ] Divij Vaidya commented on KAFKA-15609: -- Thank you for clarifying [~ocadaruma] and thank you for the quick tests. We actually want to test the condition where we write to a MMap but read using (non direct) File I/O. We want to validate that even if MMap may not have been flushed, the data read via File I/O is consistent. Would you have time to quickly test that? > Also, in my understanding, page cache is directly mapped to the mmap area so > even when we try to read the file with ordinary read() call which is written > by mmap, the content should be consistent. This is the point about which I am not able to find any concrete implementation. Would you happen to have some reference that I can read about this? > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782194#comment-17782194 ] Haruki Okada commented on KAFKA-15609: -- [~divijvaidya] Right, when we call two mmaps, they will be mapped to different virtual address. However, as long as we use FileChannel.map, the write to one mmap is guaranteed to be visible to another mmap because they will be mapped with MAP_SHARED flag. https://github.com/adoptium/jdk11u/blob/jdk-11.0.21%2B7/src/java.base/unix/native/libnio/ch/FileChannelImpl.c#L88 > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782202#comment-17782202 ] Haruki Okada commented on KAFKA-15609: -- I validated the MappedByteBuffer behavior with this Java code: [https://gist.github.com/ocadaruma/fc26fc122829c63cb61e14d7fc96896d] When we create two mmaps from the same file, writes to 1st one are always visible to 2nd one unless we specify MapMode.PRIVATE. > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782179#comment-17782179 ] Divij Vaidya commented on KAFKA-15609: -- [~ijuma] No, my understanding (which might be incorrect, I am a bit naive in this area) is as follows. RAM/Physical memory is logically divided into pages by the OS. Each process is allocated a range of these pages called virtual address space. This VAS logical abstract away the fragmented nature of physical memory from the process. mmap() reserves a range of this virtual address space and maps an underlying file to it. "Maps an underlying file" means that a portion of the file is read into physical memory by the OS and the pointer is returned by Mmap (this pointer can be shared across process for IPC using same physical memory). OS now manages reading the underlying storage and loading portions of file as needed into the physical memory. Any changes done by Mmap is actually performed on the physical memory space reserved by Mmap. Page cache is a range of pages in the physcial memory which can be used by any application. File I/O is one of the application using page cache such that any (non-direct) writes/reads to OS go via page cache. I think that when OS reads part of file into the physical memory reserved by Mmap, it read it into a different space than occupied by page cache. Hence, writes to mmap are writes to physical memory but not to page cache. It can be validated by creating two different mmap for the same file and verifying whether they access the same entry in physical memory or not. As per my assumption they should not since they should access their own reserved space in physical memory. > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782160#comment-17782160 ] Ismael Juma commented on KAFKA-15609: - [~divijvaidya] the indexes write immediately to page cache, right? `flush` forces it to _disk._ not to page cache. > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782148#comment-17782148 ] Divij Vaidya commented on KAFKA-15609: -- Hey folks I did some more digging on this and I am not convinced that this is not a bug but I don't have proof right now. My understanding is as follow: 1. MMap will read/write from page cache in the OS. 2. MMap flushed data to page cache "eventually" after writes. Note that MappedByteBuffer.force() method forces a flush of MMap to page cache. 3. In Kafka, we flush indexes leading to MappedByteBuffer.force() on segment roll, but this is performed asynchronously in a separate thread. Now consider the situation: 1. Data is written to segment and index. 2. Segment rolls and schedules a flush. 3. Since segment is closed, RSM will consider it for copying to remote. 4. RSM will read from page cache which is guaranteed to give correct (even non-flushed / dirty) data for segment but it may give out of date data for indexes. Result: Indexes uploaded to remote tier will not contain all data that is expected in it. I will keep this ticket closed until I have some reproducer but I would solicit your thoughts on this. Proposal: To fix this, we should call AbstractIndex.flush() in RLM before passing the index to RSM for upload. > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1186#comment-1186 ] Divij Vaidya commented on KAFKA-15609: -- Hey Luke The corruption we observed was observed during disk full or process crash scenarios. The ticket and corresponding fix at https://issues.apache.org/jira/browse/KAFKA-15401 by my co-worker should be sufficient to address that problem. We don't have complete data but from what we have, the exact nature of corruption is data being truncated/missing from index, hence, leading to failures during sanityCheck(). Also, we have another ticket https://issues.apache.org/jira/browse/KAFKA-15612 to decide on whether we need to flush the index or not. The above conversation has been inconclusive because we have conflicting opinions on whether mmap needs a flush or not (we all agree that user data files don't need flush since OS provides page cache read after write guarantee). We can close this one and continue discussion in the ticket I linked here. > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1171#comment-1171 ] Luke Chen commented on KAFKA-15609: --- [~divijvaidya], do you know where the index file corrupted? In the end of the file? Do you have any more info on this if we now think that's not related to the "unflushed" data. I just had a test to delay flush data when rolling the segments. And during the delay, making sure the log segment and index files are uploaded to remote storage. Under this case, client can still consume the data correctly. So, I'm wondering what we're going to do with this ticket. More info about the issue should help. If no, I think we can close this and re-open it if we encounter similar issue and have more info. WDYT? > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775965#comment-17775965 ] Haruki Okada commented on KAFKA-15609: -- > is the OS intelligent enough to understand that it should provide a "dirty" / > non-flushed view of the file to the second thread as well? As Ismael pointed out, all file operations go through page cache (DirectIO is the exception but it isn't a case in Kafka) so uploading unflushed index to the remote storage shouldn't be an issue. > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775964#comment-17775964 ] Satish Duggana commented on KAFKA-15609: These memory mapped operations are generally OS/platform dependent. I prefer to have a defined contract at the usages level rather than depending on any assumptions. In this case, we can have a contract for RLM to make sure these files are flushed to disk before file paths are given to RSM to read this data and write it into remote storage. > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775961#comment-17775961 ] Satish Duggana commented on KAFKA-15609: [~divijvaidya] How are index files read in your RSM implementation to copy them to remote storage? https://issues.apache.org/jira/browse/KAFKA-15612 filed based on another discussion related to the issue mentioned in this JIRA. > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775761#comment-17775761 ] Ismael Juma commented on KAFKA-15609: - If you use buffered io (not direct io), you still go via the page cache. But if you do not fsync, it's possible for the disk stage to diverge from the uploaded state after a crash (for example). > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775737#comment-17775737 ] Divij Vaidya commented on KAFKA-15609: -- > So you mean, when a segment rolled, before the data flushed, the RSM is > copying the incomplete segment/index into remote tier, right? Yes. For a log segment, this might not be a problem because the RLM plugin will read the log segment from the page cache even if it hasn't been fsync()'ed. I am assuming OS is intelligent enough to ensure that multiple threads read the same dirty copy of data from page cache. But for index files which are stored in MappedByteBuffer (backed by Mmapped files), we are reading from two different threads. One thread (request handler thread) accesses the file via Mmap interface which will provide consistent view. But the other thread, RLM plugin thread, will read the file via OS I/O without going through Mmap interfaces. In such cases, is the OS intelligent enough to understand that it should provide a "dirty" / non-flushed view of the file to the second thread as well? I don't have an answer and we may need some research to establish if this could be a problem or not. I will wait for [~ocadaruma] to comment here since they seem like an expert on impact of fsync, mmaps and page cache optimizations. > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775725#comment-17775725 ] Luke Chen commented on KAFKA-15609: --- Wow, another edge case! Nice find! So you mean, when a segment rolled, before the data flushed, the RSM is copying the incomplete segment/index into remote tier, right? Agree we should force flush before copying data. I've checked, if we flush the offset multiple times, only the 1st time will acquire lock and do real flush operation. So it won't increase overhead to CPU. > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier
[ https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775687#comment-17775687 ] Divij Vaidya commented on KAFKA-15609: -- [~satishd] [~showuon] thoughts? > Corrupted index uploaded to remote tier > --- > > Key: KAFKA-15609 > URL: https://issues.apache.org/jira/browse/KAFKA-15609 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Minor > > While testing Tiered Storage, we have observed corrupt indexes being present > in remote tier. One such situation is covered here at > https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another > such possible case of corruption. > Potential cause of index corruption: > We want to ensure that the file we are passing to RSM plugin contains all the > data which is present in MemoryByteBuffer i.e. we should have flushed the > MemoryByteBuffer to the file using force(). In Kafka, when we close a > segment, indexes are flushed asynchronously [1]. Hence, it might be possible > that when we are passing the file to RSM, the file doesn't contain flushed > data. Hence, we may end up uploading indexes which haven't been flushed yet. > Ideally, the contract should enforce that we force flush the content of > MemoryByteBuffer before we give the file for RSM. This will ensure that > indexes are not corrupted/incomplete. > [1] > [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613] > -- This message was sent by Atlassian Jira (v8.20.10#820010)