This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ee3438  RATIS-167. Implement a high performance OutputStream using 
async API.
4ee3438 is described below

commit 4ee34383eb81f565b6cf06072170f2c130c51e8e
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jun 3 14:50:25 2019 +0800

    RATIS-167. Implement a high performance OutputStream using async API.
---
 .../apache/ratis/client/impl/RaftOutputStream.java | 136 +++++++++++++++++
 .../apache/ratis/grpc/client/GrpcOutputStream.java |   4 +-
 .../org/apache/ratis/OutputStreamBaseTest.java     | 166 +++++++++++----------
 .../apache/ratis/grpc/TestGrpcOutputStream.java    |  50 +++++++
 .../ratis/grpc/TestRaftOutputStreamWithGrpc.java   |  36 +++++
 5 files changed, 308 insertions(+), 84 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java
new file mode 100644
index 0000000..1cdbeac
--- /dev/null
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.SizeInBytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+/** An {@link OutputStream} implementation using {@link 
RaftClient#sendAsync(Message)} API. */
+public class RaftOutputStream extends OutputStream {
+  private final Supplier<RaftClient> client;
+  private final AtomicBoolean closed = new AtomicBoolean();
+  private final Queue<CompletableFuture<Long>> flushFutures = new 
LinkedList<>();
+
+  private final byte[] buffer;
+  private int byteCount;
+  private long byteFlushed;
+
+  public RaftOutputStream(Supplier<RaftClient> clientSupplier, SizeInBytes 
bufferSize) {
+    this.client = JavaUtils.memoize(clientSupplier);
+    this.buffer = new byte[bufferSize.getSizeInt()];
+  }
+
+  private RaftClient getClient() {
+    return client.get();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    checkClosed();
+    buffer[byteCount++] = (byte)b;
+    flushIfNecessary();
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkClosed();
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    for(int total = 0; total < len; ) {
+      final int toWrite = Math.min(len - total, buffer.length - byteCount);
+      System.arraycopy(b, off + total, buffer, byteCount, toWrite);
+      byteCount += toWrite;
+      total += toWrite;
+      flushIfNecessary();
+    }
+  }
+
+  private void flushIfNecessary() {
+    if (byteCount == buffer.length) {
+      flushAsync();
+    }
+  }
+
+  /** Non-blocking flush call */
+  private void flushAsync() {
+    final long pos = byteFlushed;
+    if (byteCount == 0) {
+      return;
+    }
+
+    final CompletableFuture<Long> f = getClient().sendAsync(
+        Message.valueOf(ProtoUtils.toByteString(buffer, 0, byteCount))
+    ).thenApply(reply -> RaftClientImpl.handleRaftException(reply, 
CompletionException::new)
+    ).thenApply(reply -> reply != null && reply.isSuccess()? pos: null);
+    flushFutures.offer(f);
+
+    byteFlushed += byteCount;
+    byteCount = 0;
+  }
+
+  /** Blocking flush call */
+  private void flushImpl() throws IOException {
+    final long pos = byteFlushed;
+    flushAsync();
+    for(; !flushFutures.isEmpty();) {
+      final Long flushed = flushFutures.poll().join();
+      if (flushed == null) {
+        throw new IOException("Failed to flush at position " + pos);
+      }
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    checkClosed();
+    flushImpl();
+  }
+
+  private void checkClosed() throws IOException {
+    if (closed.get()) {
+      throw new IOException(this + " was closed.");
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed.compareAndSet(false, true)) {
+      flushImpl();
+      getClient().close();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "-" + getClient().getId() + 
":byteFlushed=" + byteFlushed;
+  }
+}
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
index e2eff08..7158381 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -102,7 +102,7 @@ public class GrpcOutputStream extends OutputStream {
 
   @Override
   public String toString() {
-    return "GrpcOutputStream-" + clientId;
+    return getClass().getSimpleName() + "-" + clientId;
   }
 
   private void checkClosed() throws IOException {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java 
b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
similarity index 64%
rename from ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
rename to ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index 408f4be..84d99d0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,62 +15,51 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.grpc;
-
-import org.apache.log4j.Level;
-import org.apache.ratis.BaseTest;
-import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.client.GrpcClientStreamer;
-import org.apache.ratis.grpc.client.GrpcOutputStream;
-import org.apache.ratis.protocol.ClientId;
+package org.apache.ratis;
+
+import org.apache.ratis.client.impl.RaftOutputStream;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.StringUtils;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.*;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 import static org.junit.Assert.fail;
 
-@Ignore
-public class TestRaftStream extends BaseTest {
-  static {
-    LogUtils.setLogLevel(GrpcClientStreamer.LOG, Level.ALL);
-  }
-
-  private static final RaftProperties prop = new RaftProperties();
+public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
   private static final int NUM_SERVERS = 3;
   private static final byte[] BYTES = new byte[4];
 
-  private MiniRaftClusterWithGrpc cluster;
-
-  @After
-  public void tearDown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
+  public OutputStream newOutputStream(CLUSTER cluster, int bufferSize) {
+    return new RaftOutputStream(cluster::createClient, 
SizeInBytes.valueOf(bufferSize));
   }
 
-  private byte[] toBytes(int i) {
-    return toBytes(i, BYTES);
-  }
-  private byte[] toBytes(int i, byte[] b) {
+  private static byte[] toBytes(int i) {
+    final byte[] b = BYTES;
     b[0] = (byte) ((i >>> 24) & 0xFF);
     b[1] = (byte) ((i >>> 16) & 0xFF);
     b[2] = (byte) ((i >>> 8) & 0xFF);
@@ -80,25 +69,20 @@ public class TestRaftStream extends BaseTest {
 
   @Test
   public void testSimpleWrite() throws Exception {
-    final int numRequests = 500;
-    LOG.info("Running testSimpleWrite, numRequests=" + numRequests);
-
-    // default 64K is too large for a test
-    GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4));
-    cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
-
-    cluster.start();
-    RaftServerImpl leader = waitForLeader(cluster);
+    runWithNewCluster(NUM_SERVERS, this::runTestSimpleWrite);
+  }
 
-    try (GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
-        cluster.getGroup(), leader.getId(), null)) {
+  private void runTestSimpleWrite(CLUSTER cluster) throws Exception {
+    final int numRequests = 5000;
+    final int bufferSize = 4;
+    try (OutputStream out = newOutputStream(cluster, bufferSize)) {
       for (int i = 0; i < numRequests; i++) { // generate requests
         out.write(toBytes(i));
       }
     }
 
     // check the leader's raft log
-    final RaftLog raftLog = leader.getState().getLog();
+    final RaftLog raftLog = cluster.getLeader().getState().getLog();
     final AtomicInteger i = new AtomicInteger();
     checkLog(raftLog, numRequests, () -> toBytes(i.getAndIncrement()));
   }
@@ -106,30 +90,36 @@ public class TestRaftStream extends BaseTest {
   private void checkLog(RaftLog raftLog, long expectedCommittedIndex,
       Supplier<byte[]> s) throws IOException {
     long committedIndex = raftLog.getLastCommittedIndex();
-    Assert.assertEquals(expectedCommittedIndex, committedIndex);
+    Assert.assertTrue(committedIndex >= expectedCommittedIndex);
     // check the log content
-    TermIndex[] entries = raftLog.getEntries(1, expectedCommittedIndex + 1);
+    TermIndex[] entries = raftLog.getEntries(0, Long.MAX_VALUE);
+    int count = 0;
     for (TermIndex entry : entries) {
-      RaftProtos.LogEntryProto log  = raftLog.get(entry.getIndex());
+      LogEntryProto log  = raftLog.get(entry.getIndex());
+      if (!log.hasStateMachineLogEntry()) {
+        continue;
+      }
       byte[] logData = 
log.getStateMachineLogEntry().getLogData().toByteArray();
       byte[] expected = s.get();
-      LOG.info("log " + entry + " " + log.getLogEntryBodyCase() + " " + 
StringUtils.bytes2HexString(logData));
-      Assert.assertEquals(expected.length, logData.length);
-      Assert.assertArrayEquals(expected, logData);
+      final String message = "log " + entry + " " + log.getLogEntryBodyCase()
+          + " " + StringUtils.bytes2HexString(logData)
+          + ", expected=" + StringUtils.bytes2HexString(expected);
+      LOG.info(message);
+      Assert.assertArrayEquals(message, expected, logData);
+      count++;
     }
+    Assert.assertEquals(expectedCommittedIndex, count);
   }
 
   @Test
   public void testWriteAndFlush() throws Exception {
-    LOG.info("Running testWriteAndFlush");
-
-    GrpcConfigKeys.OutputStream.setBufferSize(prop, 
SizeInBytes.valueOf(ByteValue.BUFFERSIZE));
-    cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
-    cluster.start();
+    runWithNewCluster(NUM_SERVERS, this::runTestWriteAndFlush);
+  }
 
+  private void runTestWriteAndFlush(CLUSTER cluster) throws Exception {
+    final int bufferSize = ByteValue.BUFFERSIZE;
     RaftServerImpl leader = waitForLeader(cluster);
-    GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
-        cluster.getGroup(), leader.getId(), null);
+    OutputStream out = newOutputStream(cluster, bufferSize);
 
     int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072};
     ByteValue[] values = new ByteValue[lengths.length];
@@ -145,8 +135,7 @@ public class TestRaftStream extends BaseTest {
       out.flush();
 
       // make sure after the flush the data has been committed
-      Assert.assertEquals(expectedTxs.size(),
-          leader.getState().getLastAppliedIndex());
+      assertRaftLog(expectedTxs.size(), leader);
     }
     out.close();
 
@@ -162,6 +151,18 @@ public class TestRaftStream extends BaseTest {
         () -> expectedTxs.get(index.getAndIncrement()));
   }
 
+  private RaftLog assertRaftLog(int expectedEntries, RaftServerImpl server) 
throws Exception {
+    final RaftLog raftLog = server.getState().getLog();
+    final EnumMap<LogEntryBodyCase, AtomicLong> counts = 
RaftTestUtil.countEntries(raftLog);
+    Assert.assertEquals(expectedEntries, 
counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get());
+
+    final LogEntryProto last = 
RaftTestUtil.getLastEntry(LogEntryBodyCase.STATEMACHINELOGENTRY, raftLog);
+    Assert.assertNotNull(last);
+    Assert.assertTrue(raftLog.getLastCommittedIndex() >= last.getIndex());
+    Assert.assertTrue(server.getState().getLastAppliedIndex() >= 
last.getIndex());
+    return raftLog;
+  }
+
   private static class ByteValue {
     final static int BUFFERSIZE = 1024;
 
@@ -200,15 +201,14 @@ public class TestRaftStream extends BaseTest {
 
   @Test
   public void testWriteWithOffset() throws Exception {
-    LOG.info("Running testWriteWithOffset");
-    GrpcConfigKeys.OutputStream.setBufferSize(prop, 
SizeInBytes.valueOf(ByteValue.BUFFERSIZE));
+    runWithNewCluster(NUM_SERVERS, this::runTestWriteWithOffset);
+  }
 
-    cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
-    cluster.start();
+  private void runTestWriteWithOffset(CLUSTER cluster) throws Exception {
+    final int bufferSize = ByteValue.BUFFERSIZE;
     RaftServerImpl leader = waitForLeader(cluster);
 
-    GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
-        cluster.getGroup(), leader.getId(), null);
+    final OutputStream out = newOutputStream(cluster, bufferSize);
 
     byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2];
     Arrays.fill(b1, (byte) 1);
@@ -237,18 +237,21 @@ public class TestRaftStream extends BaseTest {
     }
     out.close();
 
-    final RaftLog log = leader.getState().getLog();
     // 0.5 + 1 + 2.5 + 4 = 8
-    Assert.assertEquals(8, leader.getState().getLastAppliedIndex());
-    Assert.assertEquals(8, log.getLastCommittedIndex());
-    TermIndex[] entries = log.getEntries(1, 9);
-    byte[] actual = new byte[ByteValue.BUFFERSIZE * 8];
+    final int expectedEntries = 8;
+    final RaftLog raftLog = assertRaftLog(expectedEntries, leader);
+
+    final TermIndex[] entries = raftLog.getEntries(1, Long.MAX_VALUE);
+    final byte[] actual = new byte[ByteValue.BUFFERSIZE * expectedEntries];
     totalSize = 0;
-    for (TermIndex e : entries) {
-      byte[] eValue = 
log.get(e.getIndex()).getStateMachineLogEntry().getLogData().toByteArray();
-      Assert.assertEquals(ByteValue.BUFFERSIZE, eValue.length);
-      System.arraycopy(eValue, 0, actual, totalSize, eValue.length);
-      totalSize += eValue.length;
+    for (TermIndex ti : entries) {
+      final LogEntryProto e = raftLog.get(ti.getIndex());
+      if (e.hasStateMachineLogEntry()) {
+        final byte[] eValue = 
e.getStateMachineLogEntry().getLogData().toByteArray();
+        Assert.assertEquals(ByteValue.BUFFERSIZE, eValue.length);
+        System.arraycopy(eValue, 0, actual, totalSize, eValue.length);
+        totalSize += eValue.length;
+      }
     }
     Assert.assertArrayEquals(expected, actual);
   }
@@ -258,11 +261,11 @@ public class TestRaftStream extends BaseTest {
    */
   @Test
   public void testKillLeader() throws Exception {
-    LOG.info("Running testChangeLeader");
+    runWithNewCluster(NUM_SERVERS, this::runTestKillLeader);
+  }
 
-    GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4));
-    cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
-    cluster.start();
+  private void runTestKillLeader(CLUSTER cluster) throws Exception {
+    final int bufferSize = 4;
     final RaftServerImpl leader = waitForLeader(cluster);
 
     final AtomicBoolean running  = new AtomicBoolean(true);
@@ -273,8 +276,7 @@ public class TestRaftStream extends BaseTest {
     new Thread(() -> {
       LOG.info("Writer thread starts");
       int count = 0;
-      try (GrpcOutputStream out = new GrpcOutputStream(prop, 
ClientId.randomId(),
-          cluster.getGroup(), leader.getId(), null)) {
+      try (OutputStream out = newOutputStream(cluster, bufferSize)) {
         while (running.get()) {
           out.write(toBytes(count++));
           Thread.sleep(10);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
new file mode 100644
index 0000000..2258a1e
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.OutputStreamBaseTest;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.client.GrpcClientStreamer;
+import org.apache.ratis.grpc.client.GrpcOutputStream;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.Ignore;
+
+import java.io.OutputStream;
+
+/**
+ * Test {@link GrpcOutputStream}
+ * TODO: {@link GrpcOutputStream} current has some bugs.
+ */
+@Ignore
+public class TestGrpcOutputStream
+    extends OutputStreamBaseTest<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
+  static {
+    LogUtils.setLogLevel(GrpcClientStreamer.LOG, Level.ALL);
+  }
+
+  @Override
+  public OutputStream newOutputStream(MiniRaftClusterWithGrpc cluster, int 
bufferSize) {
+    final RaftProperties p = getProperties();
+    GrpcConfigKeys.OutputStream.setBufferSize(p, 
SizeInBytes.valueOf(bufferSize));
+    return new GrpcOutputStream(p, ClientId.randomId(), cluster.getGroup(), 
cluster.getLeader().getId(), null);
+  }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
new file mode 100644
index 0000000..1c4202b
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.OutputStreamBaseTest;
+import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.util.LogUtils;
+
+public class TestRaftOutputStreamWithGrpc
+    extends OutputStreamBaseTest<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
+  {
+    LogUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.TRACE);
+  }
+
+  @Override
+  public int getGlobalTimeoutSeconds() {
+    return 30;
+  }
+}

Reply via email to