[ https://issues.apache.org/jira/browse/HDDS-9844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856026#comment-17856026 ]
Duong edited comment on HDDS-9844 at 6/18/24 6:02 PM: ------------------------------------------------------ Discussed offline with [~weichiu] and [~smeng]. The hsync() and write() API can logically look like the below. {code:java} void hsync() { CompleteableFuture<PutBlockRespose> putBlockResponseFuture; Long lastCommitIndex; synchronize(this) { // 1. if there is pending data in the currentBuffer, create writeChunk and send. if (totalDataFlushedLength < writtenDataLength) { writeChunk = getWriteChunkRequest(currentBuffer); // 2. Send the writeChunk request. sendWriteChunkAsync(writeChunk); // 3. Reset current Buffer to accomodate new write currentBuffer = reset(currentBuffer); // 4. Update putBlock data with the last writeChunk and send putBlock. this.containerBlockData.add(writeChunk); putBlockResponseFuture = executePutBlockAsync(); } else { lastCommitIndex = getLastPendingCommit(); } } // 4. Wait for `putBlock` reply and issue a watchForCommmit if there is data written by this hsync. if (putBlockResponseFuture != null) { PutBlockRespose putBlockResponse = putBlockResponseFuture.get(); // 5. watchForCommit. watchForCommit(putBlockResponse.commitIndex); } else { // wait for lastCommitIndex to pass. } } void write(byte[] data, int off, int len) { CompleteableFuture<PutBlockRespose> putBlockResponseFuture; synchronize(this) { // 1. write data to the currentBuffer. currentBuffer.put(data) // 2. if buffer is full, if (currentBuffer.isFull()) { writeChunk = getWriteChunkRequest(currentBuffer); sendWriteChunkAsync(writeChunk); currentBuffer = reset(currentBuffer); this.containerBlockData.add(writeChunk); putBlockResponseFuture = executePutBlockAsync(); } } if (putBlockResponseFuture != null) { PutBlockRespose putBlockResponse = putBlockResponseFuture.get(); watchForCommit(putBlockResponse.commitIndex); } }{code} Implementation-wise, some points need more thought. 1. The usage of ChunkBuffer and BufferPool. When a thread create a writeChunk request from the curentBuffer, there's a configuration to enable unsafeWrap. UnsafeWrap allows the writeChunk request content to refer directly to the curentBuffer address instead of copying data. This avoids creating too many temporary buffers for writeChunk and thus eliminates the cost of GC. (With the copying approach, we'll have O(GC) = O(written data)). However, in the context of multithreaded, after a thread creates a writeChunk request out of the currentBuffer, we need to reserve/lock that buffer until the writeChunk is successfully sent. We need to define the exact behavior of how ChunkBuffers are organized to accommodate a variable number of hsync() caller threads. 2. The current implementation contains quite a lot of wrong abstraction and wrong responsibility in components. E.g. - CommitWatcher has no business to manager the flushFutures (putBlock futures). This should be a thing in the BlockInputStream. - The abstraction on top of ECBlockInputStream and RatisBlockInputStream are incorrect. - Much of the complexity of the code comes from (I think) the efforts to make it work for multithreaded contexts, but it doesn't. In the end, we got the complexity of the multithreaded implementation with the functionality of non-threadsafe. These make the code changes for desynchronize OutputStream abundant. We can probably make some refactoring efforts to clean them up before proceeding with the real code change. c.c [~weichiu][~smeng]. @szetszwo, we need your opinions about the pseudo code above and the buffer management. was (Author: JIRAUSER290990): Discussed offline with [~weichiu] and [~smeng]. The hsync() and write() API can logically look like the below. {code:java} void hsync() { CompleteableFuture<PutBlockRespose> putBlockResponseFuture; Long lastCommitIndex; synchronize(this) { // 1. if there is pending data in the currentBuffer, create writeChunk and send. if (totalDataFlushedLength < writtenDataLength) { writeChunk = getWriteChunkRequest(currentBuffer); // 2. Send the writeChunk request. sendWriteChunkAsync(writeChunk); // 3. Reset current Buffer to accomodate new write currentBuffer = reset(currentBuffer); // 4. Update putBlock data with the last writeChunk and send putBlock. this.containerBlockData.add(writeChunk); putBlockResponseFuture = executePutBlockAsync(); } else { lastCommitIndex = getLastPendingCommit(); } } // 4. Wait for `putBlock` reply and issue a watchForCommmit if there is data written by this hsync. if (putBlockResponseFuture != null) { PutBlockRespose putBlockResponse = putBlockResponseFuture.get(); // 5. watchForCommit. watchForCommit(putBlockResponse.commitIndex); } else { // wait for lastCommitIndex to pass. } } void write(byte[] data, int off, int len) { CompleteableFuture<PutBlockRespose> putBlockResponseFuture; synchronize(this) { // 1. write data to the currentBuffer. currentBuffer.put(data) // 2. if buffer is full, if (currentBuffer.isFull()) { writeChunk = getWriteChunkRequest(currentBuffer); sendWriteChunkAsync(writeChunk); currentBuffer = reset(currentBuffer); this.containerBlockData.add(writeChunk); putBlockResponseFuture = executePutBlockAsync(); } } if (putBlockResponseFuture != null) { PutBlockRespose putBlockResponse = putBlockResponseFuture.get(); watchForCommit(putBlockResponse.commitIndex); } }{code} Implementation-wise, some points need more thought. 1. The usage of ChunkBuffer and BufferPool. When a thread create a writeChunk request from the curentBuffer, there's a configuration to enable unsafeWrap. UnsafeWrap allows the writeChunk request content to refer directly to the curentBuffer address instead of copying data. This avoids creating too many temporary buffers for writeChunk and thus eliminates the cost of GC. (With the copying approach, we'll have O(GC) = O(written data)). However, in the context of multithreaded, after a thread creates a writeChunk request out of the currentBuffer, we need to reserve/lock that buffer until the writeChunk is successfully sent. We need to define the exact behavior of how ChunkBuffers are organized to accommodate a variable number of hsync() caller threads. 2. The current implementation contains quite a lot of wrong abstraction and wrong responsibility in components. E.g. - CommitWatcher has no business to manager the flushFutures (putBlock futures). This should be a thing in the BlockInputStream. - The abstraction on top of ECBlockInputStream and RatisBlockInputStream are incorrect. - Much of the complexity of the code comes from (I think) the efforts to make it work for multithreaded contexts, but it doesn't. In the end, we got the complexity of the multithreaded implementation with the functionality of non-threadsafe. These make the code changes for desynchronize OutputStream abundant. We can probably make some refactoring efforts to clean them up before proceeding with the real code change. c.c [~weichiu][~smeng]. [szetszwo], we need your opinions about the pseudo code above and the buffer management. > [hsync] De-synchronize hsync API > -------------------------------- > > Key: HDDS-9844 > URL: https://issues.apache.org/jira/browse/HDDS-9844 > Project: Apache Ozone > Issue Type: Sub-task > Reporter: Wei-Chiu Chuang > Assignee: Siyao Meng > Priority: Major > Labels: pull-request-available > Attachments: Screenshot 2024-02-27 at 8.01.48 PM.png > > > The current hysnc implementation KeyOutputStream.hsync() is wrapped in a > synchronized block. The HBase write ahead log FSHLog.SyncRunner has multiple > threads invoking hsync in parallel to reduce client latency. > We should unsynchronize Ozone's hsync such that it doesn't block waiting for > the Ratis transaction to respond. > cc: [~szetszwo] this is what we discussed offline. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org