[
https://issues.apache.org/jira/browse/RATIS-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17236625#comment-17236625
]
Tsz-wo Sze commented on RATIS-979:
----------------------------------
bq. .. can we use Java API FileChannel#transferTo, ...
Yes, we should support transferTo. That's why our DataStreamOutput defines the
method
{code}
CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer)
{code}
similar to the WritableByteChannel method
{code}
int write(ByteBuffer src) throws IOException
{code}
It is easy to wrap a DataStreamOutput as a WritableByteChannel.
{code}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 8219d699..95634b2e 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -21,6 +21,7 @@ import org.apache.ratis.io.CloseAsync;
import org.apache.ratis.protocol.DataStreamReply;
import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletableFuture;
/** An asynchronous output stream supporting zero buffer copying. */
@@ -31,4 +32,6 @@ public interface DataStreamOutput extends
CloseAsync<DataStreamReply> {
}
CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer, boolean
sync);
+
+ WritableByteChannel asWritableByteChannel();
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 45ea1be4..358aed4a 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -29,11 +29,13 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletableFuture;
/**
@@ -62,6 +64,31 @@ public class DataStreamClientImpl implements
DataStreamClient {
private final CompletableFuture<DataStreamReply> headerFuture;
private final MemoizedSupplier<CompletableFuture<DataStreamReply>>
closeSupplier
= JavaUtils.memoize(() -> send(Type.STREAM_CLOSE));
+ private final MemoizedSupplier<WritableByteChannel>
writableByteChannelSupplier
+ = JavaUtils.memoize(() -> new WritableByteChannel() {
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ try {
+ return Math.toIntExact(writeAsync(src).get().getBytesWritten());
+ } catch (Throwable t) {
+ throw IOUtils.asIOException(t);
+ }
+ }
+
+ @Override
+ public boolean isOpen() {
+ return !isClosed();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ closeAsync().get();
+ } catch (Throwable t) {
+ throw IOUtils.asIOException(t);
+ }
+ }
+ });
private long streamOffset = 0;
@@ -71,6 +98,11 @@ public class DataStreamClientImpl implements
DataStreamClient {
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(),
Type.STREAM_HEADER);
}
+ @Override
+ public WritableByteChannel asWritableByteChannel() {
+ return writableByteChannelSupplier.get();
+ }
+
private CompletableFuture<DataStreamReply> send(Type type, ByteBuffer
buffer) {
return orderedStreamAsync.sendRequest(header.getCallId(), streamOffset,
buffer, type);
}
{code}
> Ratis streaming
> ---------------
>
> Key: RATIS-979
> URL: https://issues.apache.org/jira/browse/RATIS-979
> Project: Ratis
> Issue Type: New Feature
> Components: Streaming
> Reporter: Tsz-wo Sze
> Assignee: Tsz-wo Sze
> Priority: Major
> Attachments: RatisStreaming20200929.pdf
>
>
> In this JIRA, we design and implement Ratis Streaming with zero buffer
> copying and asynchronous event driven.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)