[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-11-02 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782275#comment-17782275
 ] 

Divij Vaidya commented on KAFKA-15609:
--

Ah! Thank you [~ocadaruma] . Thank you [~adupriez] . This fills a critical gap 
in my knowledge and I am grateful for the references that you have shared. I 
guess it time for me to correct my understanding of Linux kernel. 

To conclude:
Mmaped and page cache for the same file shares the same physical memory. Hence, 
a change to Mmap is visible to File I/O over page cache and vice versa 
(assuming shared mode and non-direct read/writes).

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-11-02 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782229#comment-17782229
 ] 

Alexandre Dupriez commented on KAFKA-15609:
---

The nature - private or shared - of a memory mapping have visibility 
implications between processes, but from within the same process consistency 
should always be guaranteed. 

"Flushing" a memory-mapped file to the block device can be initiated with the 
{{msync}} syscall but that operation is not necessary for the visibility 
guarantees which are questioned in this ticket.

A succinct description of memory mapping and can be found in {_}Understanding 
the Linux Kernel, Third Edition{_}, edition O'Reilly, page 657-668.

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-11-02 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1778#comment-1778
 ] 

Haruki Okada commented on KAFKA-15609:
--

I added 
[MmapTest3.java|https://gist.github.com/ocadaruma/fc26fc122829c63cb61e14d7fc96896d]
 and confirmed the reads by read() and writes via mmap are consistent.

 

> Would you happen to have some reference that I can read about this?

 

I failed to find good web reference but the book "The Linux Programming 
Interface 2nd edition" mentions about this.

excerpt:
{quote}Like many other modern UNIX implementations, Linux provides a so-called 
unified virtual memory system. This means that, where possible, memory mappings 
and blocks of the buffer cache share the same pages of physical memory. Thus, 
the views of a file obtained via a mapping and via I/O system calls (read(), 
write(), and so on) are always consistent, and the only use of msync() is to 
force the contents of a mapped region to be flushed to disk.{quote}

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-11-02 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782210#comment-17782210
 ] 

Divij Vaidya commented on KAFKA-15609:
--

Thank you for clarifying [~ocadaruma] and thank you for the quick tests. We 
actually want to test the condition where we write to a MMap but read using 
(non direct) File I/O. We want to validate that even if MMap may not have been 
flushed, the data read via File I/O is consistent. Would you have time to 
quickly test that?

> Also, in my understanding, page cache is directly mapped to the mmap area so 
> even when we try to read the file with ordinary read() call which is written 
> by mmap, the content should be consistent.

This is the point about which I am not able to find any concrete 
implementation. Would you happen to have some reference that I can read about 
this?

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-11-02 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782194#comment-17782194
 ] 

Haruki Okada commented on KAFKA-15609:
--

[~divijvaidya]

Right, when we call two mmaps, they will be mapped to different virtual address.

However, as long as we use FileChannel.map, the write to one mmap is guaranteed 
to be visible to another mmap because they will be mapped with MAP_SHARED flag.

https://github.com/adoptium/jdk11u/blob/jdk-11.0.21%2B7/src/java.base/unix/native/libnio/ch/FileChannelImpl.c#L88

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-11-02 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782202#comment-17782202
 ] 

Haruki Okada commented on KAFKA-15609:
--

I validated the MappedByteBuffer behavior with this Java code: 
[https://gist.github.com/ocadaruma/fc26fc122829c63cb61e14d7fc96896d]

 

When we create two mmaps from the same file, writes to 1st one are always 
visible to 2nd one unless we specify MapMode.PRIVATE.

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-11-02 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782179#comment-17782179
 ] 

Divij Vaidya commented on KAFKA-15609:
--

[~ijuma] No, my understanding (which might be incorrect, I am a bit naive in 
this area) is as follows.

RAM/Physical memory is logically divided into pages by the OS. Each process is 
allocated a range of these pages called virtual address space. This VAS logical 
abstract away the fragmented nature of physical memory from the process.

mmap() reserves a range of this virtual address space and maps an underlying 
file to it. "Maps an underlying file" means that a portion of the file is read 
into physical memory by the OS and the pointer is returned by Mmap (this 
pointer can be shared across process for IPC using same physical memory). OS 
now manages reading the underlying storage and loading portions of file as 
needed into the physical memory. Any changes done by Mmap is actually performed 
on the physical memory space reserved by Mmap.

Page cache is a range of pages in the physcial memory which can be used by any 
application. File I/O is one of the application using page cache such that any 
(non-direct) writes/reads to OS go via page cache. 

I think that when OS reads part of file into the physical memory reserved by 
Mmap, it read it into a different space than occupied by page cache. Hence, 
writes to mmap are writes to physical memory but not to page cache.

It can be validated by creating two different mmap for the same file and 
verifying whether they access the same entry in physical memory or not. As per 
my assumption they should not since they should access their own reserved space 
in physical memory.

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-11-02 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782160#comment-17782160
 ] 

Ismael Juma commented on KAFKA-15609:
-

[~divijvaidya] the indexes write immediately to page cache, right? `flush` 
forces it to _disk._ not to page cache.

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-11-02 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782148#comment-17782148
 ] 

Divij Vaidya commented on KAFKA-15609:
--

Hey folks

I did some more digging on this and I am not convinced that this is not a bug 
but I don't have proof right now.

My understanding is as follow:
1. MMap will read/write from page cache in the OS. 

2. MMap flushed data to page cache "eventually" after writes. Note that 
MappedByteBuffer.force() method forces a flush of MMap to page cache.
3. In Kafka, we flush indexes leading to MappedByteBuffer.force() on segment 
roll, but this is performed asynchronously in a separate thread.

Now consider the situation:

1. Data is written to segment and index.
2. Segment rolls and schedules a flush.
3. Since segment is closed, RSM will consider it for copying to remote.
4. RSM will read from page cache which is guaranteed to give correct (even 
non-flushed / dirty) data for segment but it may give out of date data for 
indexes. 

Result: Indexes uploaded to remote tier will not contain all data that is 
expected in it.

I will keep this ticket closed until I have some reproducer but I would solicit 
your thoughts on this.

Proposal: To fix this, we should call AbstractIndex.flush() in RLM before 
passing the index to RSM for upload.

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-10-19 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1186#comment-1186
 ] 

Divij Vaidya commented on KAFKA-15609:
--

Hey Luke
The corruption we observed was observed during disk full or process crash 
scenarios. The ticket and corresponding fix at 
https://issues.apache.org/jira/browse/KAFKA-15401 by my co-worker should be 
sufficient to address that problem. We don't have complete data but from what 
we have, the exact nature of corruption is data being truncated/missing from 
index, hence, leading to failures during sanityCheck().

Also, we have another ticket https://issues.apache.org/jira/browse/KAFKA-15612 
to decide on whether we need to flush the index or not. The above conversation 
has been inconclusive because we have conflicting opinions on whether mmap 
needs a flush or not (we all agree that user data files don't need flush since 
OS provides page cache read after write guarantee). We can close this one and 
continue discussion in the ticket I linked here.

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-10-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1171#comment-1171
 ] 

Luke Chen commented on KAFKA-15609:
---

[~divijvaidya], do you know where the index file corrupted? In the end of the 
file? Do you have any more info on this if we now think that's not related to 
the "unflushed" data. 

I just had a test to delay flush data when rolling the segments. And during the 
delay, making sure the log segment and index files are uploaded to remote 
storage. Under this case, client can still consume the data correctly.

So, I'm wondering what we're going to do with this ticket. More info about the 
issue should help. If no, I think we can close this and re-open it if we 
encounter similar issue and have more info. WDYT?

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-10-16 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775965#comment-17775965
 ] 

Haruki Okada commented on KAFKA-15609:
--

> is the OS intelligent enough to understand that it should provide a "dirty" / 
> non-flushed view of the file to the second thread as well?

 

As Ismael pointed out, all file operations go through page cache (DirectIO is 
the exception but it isn't a case in Kafka) so uploading unflushed index to the 
remote storage shouldn't be an issue.

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-10-16 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775964#comment-17775964
 ] 

Satish Duggana commented on KAFKA-15609:


These memory mapped operations are generally OS/platform dependent. I prefer to 
have a defined contract at the usages level rather than depending on any 
assumptions. In this case, we can have a contract for RLM to make sure these 
files are flushed to disk before file paths are given to RSM to read this data 
and write it into remote storage.

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-10-16 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775961#comment-17775961
 ] 

Satish Duggana commented on KAFKA-15609:


[~divijvaidya] How are index files read in your RSM implementation to copy them 
to remote storage?  

https://issues.apache.org/jira/browse/KAFKA-15612 filed based on another 
discussion related to the issue mentioned in this JIRA. 

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-10-16 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775761#comment-17775761
 ] 

Ismael Juma commented on KAFKA-15609:
-

If you use buffered io (not direct io), you still go via the page cache. But if 
you do not fsync, it's possible for the disk stage to diverge from the uploaded 
state after a crash (for example).

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-10-16 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775737#comment-17775737
 ] 

Divij Vaidya commented on KAFKA-15609:
--

> So you mean, when a segment rolled, before the data flushed, the RSM is 
> copying the incomplete segment/index into remote tier, right? 

Yes. 

For a log segment, this might not be a problem because the RLM plugin will read 
the log segment from the page cache even if it hasn't been fsync()'ed. I am 
assuming OS is intelligent enough to ensure that multiple threads read the same 
dirty copy of data from page cache.

But for index files which are stored in MappedByteBuffer (backed by Mmapped 
files), we are reading from two different threads. One thread (request handler 
thread) accesses the file via Mmap interface which will provide consistent 
view. But the other thread, RLM plugin thread, will read the file via OS I/O 
without going through Mmap interfaces. In such cases, is the OS intelligent 
enough to understand that it should provide a "dirty" / non-flushed view of the 
file to the second thread as well?

I don't have an answer and we may need some research to establish if this could 
be a problem or not. I will wait for [~ocadaruma] to comment here since they 
seem like an expert on impact of fsync, mmaps and page cache optimizations. 

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-10-16 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775725#comment-17775725
 ] 

Luke Chen commented on KAFKA-15609:
---

Wow, another edge case! Nice find!
So you mean, when a segment rolled, before the data flushed, the RSM is copying 
the incomplete segment/index into remote tier, right? Agree we should force 
flush before copying data. I've checked, if we flush the offset multiple times, 
only the 1st time will acquire lock and do real flush operation. So it won't 
increase overhead to CPU. 

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15609) Corrupted index uploaded to remote tier

2023-10-16 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775687#comment-17775687
 ] 

Divij Vaidya commented on KAFKA-15609:
--

[~satishd] [~showuon] thoughts?

> Corrupted index uploaded to remote tier
> ---
>
> Key: KAFKA-15609
> URL: https://issues.apache.org/jira/browse/KAFKA-15609
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Minor
>
> While testing Tiered Storage, we have observed corrupt indexes being present 
> in remote tier. One such situation is covered here at 
> https://issues.apache.org/jira/browse/KAFKA-15401. This Jira presents another 
> such possible case of corruption.
> Potential cause of index corruption:
> We want to ensure that the file we are passing to RSM plugin contains all the 
> data which is present in MemoryByteBuffer i.e. we should have flushed the 
> MemoryByteBuffer to the file using force(). In Kafka, when we close a 
> segment, indexes are flushed asynchronously [1]. Hence, it might be possible 
> that when we are passing the file to RSM, the file doesn't contain flushed 
> data. Hence, we may end up uploading indexes which haven't been flushed yet. 
> Ideally, the contract should enforce that we force flush the content of 
> MemoryByteBuffer before we give the file for RSM. This will ensure that 
> indexes are not corrupted/incomplete.
> [1] 
> [https://github.com/apache/kafka/blob/4150595b0a2e0f45f2827cebc60bcb6f6558745d/core/src/main/scala/kafka/log/UnifiedLog.scala#L1613]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)