[ 
https://issues.apache.org/jira/browse/HDDS-9844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856026#comment-17856026
 ] 

Duong edited comment on HDDS-9844 at 6/18/24 6:02 PM:
------------------------------------------------------

Discussed offline with [~weichiu] and [~smeng]. The hsync() and write() API can 
logically look like the below.
{code:java}
void hsync() {
        CompleteableFuture<PutBlockRespose> putBlockResponseFuture;
        Long lastCommitIndex;
        synchronize(this) {
                // 1. if there is pending data in the currentBuffer, create 
writeChunk and send.
                if (totalDataFlushedLength < writtenDataLength) { 
                   writeChunk = getWriteChunkRequest(currentBuffer);
                    // 2. Send the writeChunk request.
                   sendWriteChunkAsync(writeChunk);
                   // 3. Reset current Buffer to accomodate new write
                   currentBuffer = reset(currentBuffer);
                   // 4. Update putBlock data with the last writeChunk and send 
putBlock.
                   this.containerBlockData.add(writeChunk);
                   putBlockResponseFuture = executePutBlockAsync();
                } else {
                   lastCommitIndex = getLastPendingCommit();
                }
                
        }
        // 4. Wait for `putBlock` reply and issue a watchForCommmit if there is 
data written by this hsync.
       if (putBlockResponseFuture != null) {
           PutBlockRespose putBlockResponse = putBlockResponseFuture.get();
           // 5. watchForCommit.
           watchForCommit(putBlockResponse.commitIndex);
       } else {
           // wait for lastCommitIndex to pass.
       }
                
}

void write(byte[] data, int off, int len) {
     CompleteableFuture<PutBlockRespose> putBlockResponseFuture; 
     synchronize(this) {
         // 1. write data to the currentBuffer.
         currentBuffer.put(data)
         // 2. if buffer is full, 
         if (currentBuffer.isFull()) {
           writeChunk = getWriteChunkRequest(currentBuffer); 
           sendWriteChunkAsync(writeChunk);
           currentBuffer = reset(currentBuffer); 
           this.containerBlockData.add(writeChunk); 
           putBlockResponseFuture = executePutBlockAsync();
         }
     }

     if (putBlockResponseFuture != null) {
         PutBlockRespose putBlockResponse = putBlockResponseFuture.get();
         watchForCommit(putBlockResponse.commitIndex);
     } 
}{code}

Implementation-wise, some points need more thought. 

1. The usage of ChunkBuffer and BufferPool. When a thread create a writeChunk 
request from the curentBuffer, there's a configuration to enable unsafeWrap. 
UnsafeWrap allows the writeChunk request content to refer directly to the 
curentBuffer address instead of copying data. This avoids creating too many 
temporary buffers for writeChunk and thus eliminates the cost of GC.  (With the 
copying approach, we'll have O(GC) = O(written data)).
However, in the context of multithreaded, after a thread creates a writeChunk 
request out of the currentBuffer, we need to reserve/lock that buffer until the 
writeChunk is successfully sent. We need to define the exact behavior of how 
ChunkBuffers are organized to accommodate a variable number of hsync() caller 
threads.

2. The current implementation contains quite a lot of wrong abstraction and 
wrong responsibility in components. E.g. 
- CommitWatcher has no business to manager the flushFutures (putBlock futures). 
This should be a thing in the BlockInputStream.
- The abstraction on top of ECBlockInputStream and RatisBlockInputStream are 
incorrect.
- Much of the complexity of the code comes from (I think) the efforts to make 
it work for multithreaded contexts, but it doesn't. In the end, we got the 
complexity of the multithreaded implementation with the functionality of 
non-threadsafe.

These make the code changes for desynchronize OutputStream abundant. We can 
probably make some refactoring efforts to clean them up before proceeding with 
the real code change.  

c.c [~weichiu][~smeng]. @szetszwo, we need your opinions about the pseudo code 
above and the buffer management.



was (Author: JIRAUSER290990):
Discussed offline with [~weichiu] and [~smeng]. The hsync() and write() API can 
logically look like the below.
{code:java}
void hsync() {
        CompleteableFuture<PutBlockRespose> putBlockResponseFuture;
        Long lastCommitIndex;
        synchronize(this) {
                // 1. if there is pending data in the currentBuffer, create 
writeChunk and send.
                if (totalDataFlushedLength < writtenDataLength) { 
                   writeChunk = getWriteChunkRequest(currentBuffer);
                    // 2. Send the writeChunk request.
                   sendWriteChunkAsync(writeChunk);
                   // 3. Reset current Buffer to accomodate new write
                   currentBuffer = reset(currentBuffer);
                   // 4. Update putBlock data with the last writeChunk and send 
putBlock.
                   this.containerBlockData.add(writeChunk);
                   putBlockResponseFuture = executePutBlockAsync();
                } else {
                   lastCommitIndex = getLastPendingCommit();
                }
                
        }
        // 4. Wait for `putBlock` reply and issue a watchForCommmit if there is 
data written by this hsync.
       if (putBlockResponseFuture != null) {
           PutBlockRespose putBlockResponse = putBlockResponseFuture.get();
           // 5. watchForCommit.
           watchForCommit(putBlockResponse.commitIndex);
       } else {
           // wait for lastCommitIndex to pass.
       }
                
}

void write(byte[] data, int off, int len) {
     CompleteableFuture<PutBlockRespose> putBlockResponseFuture; 
     synchronize(this) {
         // 1. write data to the currentBuffer.
         currentBuffer.put(data)
         // 2. if buffer is full, 
         if (currentBuffer.isFull()) {
           writeChunk = getWriteChunkRequest(currentBuffer); 
           sendWriteChunkAsync(writeChunk);
           currentBuffer = reset(currentBuffer); 
           this.containerBlockData.add(writeChunk); 
           putBlockResponseFuture = executePutBlockAsync();
         }
     }

     if (putBlockResponseFuture != null) {
         PutBlockRespose putBlockResponse = putBlockResponseFuture.get();
         watchForCommit(putBlockResponse.commitIndex);
     } 
}{code}

Implementation-wise, some points need more thought. 

1. The usage of ChunkBuffer and BufferPool. When a thread create a writeChunk 
request from the curentBuffer, there's a configuration to enable unsafeWrap. 
UnsafeWrap allows the writeChunk request content to refer directly to the 
curentBuffer address instead of copying data. This avoids creating too many 
temporary buffers for writeChunk and thus eliminates the cost of GC.  (With the 
copying approach, we'll have O(GC) = O(written data)).
However, in the context of multithreaded, after a thread creates a writeChunk 
request out of the currentBuffer, we need to reserve/lock that buffer until the 
writeChunk is successfully sent. We need to define the exact behavior of how 
ChunkBuffers are organized to accommodate a variable number of hsync() caller 
threads.

2. The current implementation contains quite a lot of wrong abstraction and 
wrong responsibility in components. E.g. 
- CommitWatcher has no business to manager the flushFutures (putBlock futures). 
This should be a thing in the BlockInputStream.
- The abstraction on top of ECBlockInputStream and RatisBlockInputStream are 
incorrect.
- Much of the complexity of the code comes from (I think) the efforts to make 
it work for multithreaded contexts, but it doesn't. In the end, we got the 
complexity of the multithreaded implementation with the functionality of 
non-threadsafe.

These make the code changes for desynchronize OutputStream abundant. We can 
probably make some refactoring efforts to clean them up before proceeding with 
the real code change.  

c.c [~weichiu][~smeng]. [szetszwo], we need your opinions about the pseudo code 
above and the buffer management.


> [hsync] De-synchronize hsync API
> --------------------------------
>
>                 Key: HDDS-9844
>                 URL: https://issues.apache.org/jira/browse/HDDS-9844
>             Project: Apache Ozone
>          Issue Type: Sub-task
>            Reporter: Wei-Chiu Chuang
>            Assignee: Siyao Meng
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: Screenshot 2024-02-27 at 8.01.48 PM.png
>
>
> The current hysnc implementation KeyOutputStream.hsync() is wrapped in a 
> synchronized block. The HBase write ahead log FSHLog.SyncRunner has multiple 
> threads invoking hsync in parallel to reduce client latency.
> We should unsynchronize Ozone's hsync such that it doesn't block waiting for 
> the Ratis transaction to respond.
> cc: [~szetszwo] this is what we discussed offline.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to