[ 
https://issues.apache.org/jira/browse/RATIS-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17236625#comment-17236625
 ] 

Tsz-wo Sze commented on RATIS-979:
----------------------------------

bq. .. can we use Java API FileChannel#transferTo, ...

Yes, we should support transferTo.  That's why our DataStreamOutput defines the 
method
{code}
  CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer)
{code}
similar to the WritableByteChannel method
{code}
  int write(ByteBuffer src) throws IOException
{code}
It is easy to wrap a DataStreamOutput as a WritableByteChannel.
{code}
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 8219d699..95634b2e 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -21,6 +21,7 @@ import org.apache.ratis.io.CloseAsync;
 import org.apache.ratis.protocol.DataStreamReply;
 
 import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 import java.util.concurrent.CompletableFuture;
 
 /** An asynchronous output stream supporting zero buffer copying. */
@@ -31,4 +32,6 @@ public interface DataStreamOutput extends 
CloseAsync<DataStreamReply> {
   }
 
   CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer, boolean 
sync);
+
+  WritableByteChannel asWritableByteChannel();
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 45ea1be4..358aed4a 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -29,11 +29,13 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -62,6 +64,31 @@ public class DataStreamClientImpl implements 
DataStreamClient {
     private final CompletableFuture<DataStreamReply> headerFuture;
     private final MemoizedSupplier<CompletableFuture<DataStreamReply>> 
closeSupplier
         = JavaUtils.memoize(() -> send(Type.STREAM_CLOSE));
+    private final MemoizedSupplier<WritableByteChannel> 
writableByteChannelSupplier
+        = JavaUtils.memoize(() -> new WritableByteChannel() {
+      @Override
+      public int write(ByteBuffer src) throws IOException {
+        try {
+          return Math.toIntExact(writeAsync(src).get().getBytesWritten());
+        } catch (Throwable t) {
+          throw IOUtils.asIOException(t);
+        }
+      }
+
+      @Override
+      public boolean isOpen() {
+        return !isClosed();
+      }
+
+      @Override
+      public void close() throws IOException {
+        try {
+          closeAsync().get();
+        } catch (Throwable t) {
+          throw IOUtils.asIOException(t);
+        }
+      }
+    });
 
     private long streamOffset = 0;
 
@@ -71,6 +98,11 @@ public class DataStreamClientImpl implements 
DataStreamClient {
           
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(),
 Type.STREAM_HEADER);
     }
 
+    @Override
+    public WritableByteChannel asWritableByteChannel() {
+      return writableByteChannelSupplier.get();
+    }
+
     private CompletableFuture<DataStreamReply> send(Type type, ByteBuffer 
buffer) {
       return orderedStreamAsync.sendRequest(header.getCallId(), streamOffset, 
buffer, type);
     }
{code}

> Ratis streaming
> ---------------
>
>                 Key: RATIS-979
>                 URL: https://issues.apache.org/jira/browse/RATIS-979
>             Project: Ratis
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Tsz-wo Sze
>            Assignee: Tsz-wo Sze
>            Priority: Major
>         Attachments: RatisStreaming20200929.pdf
>
>
> In this JIRA, we design and implement Ratis Streaming with zero buffer 
> copying and asynchronous event driven.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to