http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java b/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java deleted file mode 100644 index e04d141..0000000 --- a/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft; - -import org.apache.log4j.Level; -import org.apache.raft.RaftTestUtil.SimpleMessage; -import org.apache.raft.client.RaftClient; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.examples.RaftExamplesTestUtil; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.simulation.RequestHandler; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.statemachine.SimpleStateMachine4Testing; -import org.apache.raft.statemachine.StateMachine; -import org.apache.raft.util.RaftUtils; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; - -/** - * Test restarting raft peers. - */ -@RunWith(Parameterized.class) -public class TestRestartRaftPeer { - static Logger LOG = LoggerFactory.getLogger(TestRestartRaftPeer.class); - static { - RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - @Parameterized.Parameters - public static Collection<Object[]> data() throws IOException { - RaftProperties prop = new RaftProperties(); - prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, - SimpleStateMachine4Testing.class, StateMachine.class); - prop.setInt(RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 1024 * 8); - return RaftExamplesTestUtil.getMiniRaftClusters(prop, 3); - } - - @Parameterized.Parameter - public MiniRaftCluster cluster; - - @Rule - public Timeout globalTimeout = new Timeout(60 * 1000); - - @Test - public void restartFollower() throws Exception { - cluster.start(); - RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); - - // write some messages - final byte[] content = new byte[1024]; - Arrays.fill(content, (byte) 1); - final SimpleMessage message = new SimpleMessage(new String(content)); - for (int i = 0; i < 10; i++) { - Assert.assertTrue(client.send(message).isSuccess()); - } - - // restart a follower - String followerId = cluster.getFollowers().get(0).getId(); - LOG.info("Restart follower {}", followerId); - cluster.restartServer(followerId, false); - - // write some more messages - for (int i = 0; i < 10; i++) { - Assert.assertTrue(client.send(message).isSuccess()); - } - client.close(); - - // make sure the restarted follower can catchup - boolean catchup = false; - long lastAppliedIndex = 0; - for (int i = 0; i < 10 && !catchup; i++) { - Thread.sleep(500); - lastAppliedIndex = cluster.getServer(followerId).getState().getLastAppliedIndex(); - catchup = lastAppliedIndex >= 20; - } - Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup); - - // make sure the restarted peer's log segments is correct - cluster.restartServer(followerId, false); - Assert.assertTrue(cluster.getServer(followerId).getState().getLog() - .getLastEntry().getIndex() >= 20); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-examples/src/test/java/org/apache/raft/examples/RaftExamplesTestUtil.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/test/java/org/apache/raft/examples/RaftExamplesTestUtil.java b/raft-examples/src/test/java/org/apache/raft/examples/RaftExamplesTestUtil.java deleted file mode 100644 index ec622f4..0000000 --- a/raft-examples/src/test/java/org/apache/raft/examples/RaftExamplesTestUtil.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.examples; - -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.grpc.MiniRaftClusterWithGRpc; -import org.apache.raft.hadooprpc.MiniRaftClusterWithHadoopRpc; -import org.apache.raft.netty.MiniRaftClusterWithNetty; -import org.apache.raft.server.simulation.MiniRaftClusterWithSimulatedRpc; -import org.apache.raft.statemachine.StateMachine; - -import java.io.IOException; -import java.util.*; - -public class RaftExamplesTestUtil { - private static void add( - Collection<Object[]> clusters, MiniRaftCluster.Factory factory, - String[] ids, RaftProperties properties) - throws IOException { - clusters.add(new Object[]{factory.newCluster(ids, properties, true)}); - } - - public static Collection<Object[]> getMiniRaftClusters( - RaftProperties prop, int clusterSize, Class<?>... clusterClasses) - throws IOException { - final List<Class<?>> classes = Arrays.asList(clusterClasses); - final boolean isAll = classes.isEmpty(); //empty means all - - final Iterator<String[]> ids = new Iterator<String[]>() { - private int i = 0; - @Override - public boolean hasNext() { - return true; - } - @Override - public String[] next() { - return MiniRaftCluster.generateIds(clusterSize, i++*clusterSize); - } - }; - - final List<Object[]> clusters = new ArrayList<>(); - - if (isAll || classes.contains(MiniRaftClusterWithSimulatedRpc.class)) { - add(clusters, MiniRaftClusterWithSimulatedRpc.FACTORY, ids.next(), prop); - } - if (isAll || classes.contains(MiniRaftClusterWithHadoopRpc.class)) { - add(clusters, MiniRaftClusterWithHadoopRpc.FACTORY, ids.next(), prop); - } - if (isAll || classes.contains(MiniRaftClusterWithNetty.class)) { - add(clusters, MiniRaftClusterWithNetty.FACTORY, ids.next(), prop); - } - if (isAll || classes.contains(MiniRaftClusterWithGRpc.class)) { - add(clusters, MiniRaftClusterWithGRpc.FACTORY, ids.next(), prop); - } - return clusters; - } - - public static <S extends StateMachine> Collection<Object[]> getMiniRaftClusters( - Class<S> stateMachineClass, Class<?>... clusterClasses) throws IOException { - final RaftProperties prop = new RaftProperties(); - prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, - stateMachineClass, StateMachine.class); - return getMiniRaftClusters(prop, 3, clusterClasses); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/TestArithmetic.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/TestArithmetic.java b/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/TestArithmetic.java deleted file mode 100644 index 27b3814..0000000 --- a/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/TestArithmetic.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.examples.arithmetic; - - -import org.apache.log4j.Level; -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.RaftTestUtil; -import org.apache.raft.client.RaftClient; -import org.apache.raft.examples.RaftExamplesTestUtil; -import org.apache.raft.examples.arithmetic.expression.*; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.util.RaftUtils; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.Collection; - -@RunWith(Parameterized.class) -public class TestArithmetic { - static { - RaftUtils.setLogLevel(ArithmeticStateMachine.LOG, Level.ALL); - } - - @Parameterized.Parameters - public static Collection<Object[]> data() throws IOException { - return RaftExamplesTestUtil.getMiniRaftClusters(ArithmeticStateMachine.class); - } - - @Parameterized.Parameter - public MiniRaftCluster cluster; - - @Test - public void testPythagorean() throws Exception { - cluster.start(); - RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("pythagorean", leaderId); - - final Variable a = new Variable("a"); - final Variable b = new Variable("b"); - final Variable c = new Variable("c"); - final BinaryExpression a2 = new BinaryExpression(BinaryExpression.Op.MULT, a, a); - final BinaryExpression b2 = new BinaryExpression(BinaryExpression.Op.MULT, b, b); - final BinaryExpression c2 = new BinaryExpression(BinaryExpression.Op.ADD, a2, b2); - final AssignmentMessage pythagorean = new AssignmentMessage(c, - new UnaryExpression(UnaryExpression.Op.SQRT, c2)); - - final AssignmentMessage nullA = new AssignmentMessage(a, NullValue.getInstance()); - final AssignmentMessage nullB = new AssignmentMessage(b, NullValue.getInstance()); - final AssignmentMessage nullC = new AssignmentMessage(c, NullValue.getInstance()); - - for(int n = 3; n < 100; n += 2) { - int n2 = n*n; - int half_n2 = n2/2; - - RaftClientReply r; - r = client.send(new AssignmentMessage(a, new DoubleValue(n))); - assertRaftClientReply(r, (double)n); - r = client.sendReadOnly(Expression.Utils.toMessage(a2)); - assertRaftClientReply(r, (double)n2); - r = client.send(new AssignmentMessage(b, new DoubleValue(half_n2))); - assertRaftClientReply(r, (double)half_n2); - r = client.sendReadOnly(Expression.Utils.toMessage(b2)); - assertRaftClientReply(r, (double)half_n2*half_n2); - r = client.send(pythagorean); - assertRaftClientReply(r, (double)half_n2 + 1); - - r = client.send(nullA); - assertRaftClientReply(r, null); - r = client.send(nullB); - assertRaftClientReply(r, null); - r = client.send(nullC); - assertRaftClientReply(r, null); - } - client.close(); - cluster.shutdown(); - } - - static void assertRaftClientReply(RaftClientReply reply, Double expected) { - Assert.assertTrue(reply.isSuccess()); - final Expression e = Expression.Utils.bytes2Expression( - reply.getMessage().getContent().toByteArray(), 0); - Assert.assertEquals(expected, e.evaluate(null)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/expression/TestExpression.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/expression/TestExpression.java b/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/expression/TestExpression.java deleted file mode 100644 index f06c88e..0000000 --- a/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/expression/TestExpression.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.examples.arithmetic.expression; - - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; - -public class TestExpression { - static final Logger LOG = LoggerFactory.getLogger(TestExpression.class); - - @Test - public void testArithmeticUtils() throws Exception { - final Random ran = ThreadLocalRandom.current(); - final byte[] buf = new byte[1024]; - int offset = 0; - - for(int i = 0; i < 10; i++) { - { - final int n = ran.nextInt(); - Expression.Utils.int2bytes(n, buf, offset); - final int m = Expression.Utils.bytes2int(buf, offset); - Assert.assertEquals(n, m); - offset += 4; - } - { - final long n = ran.nextLong(); - Expression.Utils.long2bytes(n, buf, offset); - final long m = Expression.Utils.bytes2long(buf, offset); - Assert.assertEquals(n, m); - offset += 8; - } - { - final double n = ran.nextDouble(); - Expression.Utils.double2bytes(n, buf, offset); - final double m = Expression.Utils.bytes2double(buf, offset); - Assert.assertTrue(n == m); - offset += 8; - } - } - } - @Test - public void testOp() throws Exception { - for(BinaryExpression.Op op : BinaryExpression.Op.values()) { - final byte b = op.byteValue(); - Assert.assertEquals(op, BinaryExpression.Op.valueOf(b)); - } - for(UnaryExpression.Op op : UnaryExpression.Op.values()) { - final byte b = op.byteValue(); - Assert.assertEquals(op, UnaryExpression.Op.valueOf(b)); - } - } - - @Test - public void testExpression() throws Exception { - final byte[] buf = new byte[1024]; - int offset = 0; - - { - final Variable a = new Variable("pi"); - LOG.info("var a: " + a); - final int len = a.toBytes(buf, offset); - final Variable a2 = new Variable(buf, offset); - LOG.info("var a2: " + a2); - Assert.assertEquals(a.getName(), a2.getName()); - Assert.assertEquals(len, a.length()); - Assert.assertEquals(len, a2.length()); - offset += len; - } - - { - final DoubleValue three = new DoubleValue(3); - LOG.info("double three: " + three.evaluate(null)); - final int len = three.toBytes(buf, offset); - final DoubleValue three2 = new DoubleValue(buf, offset); - LOG.info("double three2: " + three2.evaluate(null)); - Assert.assertTrue(three.evaluate(null).equals(three2.evaluate(null))); - Assert.assertEquals(len, three.length()); - Assert.assertEquals(len, three2.length()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java b/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java deleted file mode 100644 index 0832579..0000000 --- a/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - -import org.apache.log4j.Level; -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.RaftTestUtil; -import org.apache.raft.client.RaftClient; -import org.apache.raft.examples.RaftExamplesTestUtil; -import org.apache.raft.protocol.Message; -import org.apache.raft.protocol.StateMachineException; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.simulation.RequestHandler; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.util.RaftUtils; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.CompletableFuture; - -import static org.junit.Assert.fail; - -@RunWith(Parameterized.class) -public class TestRaftStateMachineException { - static { - RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - protected static class StateMachineWithException extends SimpleStateMachine4Testing { - @Override - public CompletableFuture<Message> applyTransaction(TransactionContext trx) { - CompletableFuture<Message> future = new CompletableFuture<>(); - future.completeExceptionally(new StateMachineException("Fake Exception")); - return future; - } - } - - @Parameterized.Parameters - public static Collection<Object[]> data() throws IOException { - return RaftExamplesTestUtil.getMiniRaftClusters( - StateMachineWithException.class); - } - - @Parameterized.Parameter - public MiniRaftCluster cluster; - - @Test - public void testHandleStateMachineException() throws Exception { - cluster.start(); - RaftTestUtil.waitForLeader(cluster); - - final String leaderId = cluster.getLeader().getId(); - - try(final RaftClient client = cluster.createClient("client", leaderId)) { - client.send(new RaftTestUtil.SimpleMessage("m")); - fail("Exception expected"); - } catch (StateMachineException e) { - e.printStackTrace(); - Assert.assertTrue(e.getMessage().contains("Fake Exception")); - } - - cluster.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-examples/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/raft-examples/src/test/resources/log4j.properties b/raft-examples/src/test/resources/log4j.properties deleted file mode 100644 index ced0687..0000000 --- a/raft-examples/src/test/resources/log4j.properties +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# log4j configuration used during build and unit tests - -log4j.rootLogger=info,stdout -log4j.threshold=ALL -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/pom.xml ---------------------------------------------------------------------- diff --git a/raft-grpc/pom.xml b/raft-grpc/pom.xml deleted file mode 100644 index 9b712fd..0000000 --- a/raft-grpc/pom.xml +++ /dev/null @@ -1,93 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. See accompanying LICENSE file. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <artifactId>raft-project-dist</artifactId> - <groupId>com.hortonworks.raft</groupId> - <version>1.0-SNAPSHOT</version> - <relativePath>../raft-project-dist</relativePath> - </parent> - - <artifactId>raft-grpc</artifactId> - <name>Raft gRPC Support</name> - - <dependencies> - <dependency> - <artifactId>raft-proto-shaded</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>provided</scope> - </dependency> - - <dependency> - <artifactId>raft-common</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>provided</scope> - </dependency> - <dependency> - <artifactId>raft-common</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <artifactId>raft-client</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>provided</scope> - </dependency> - <dependency> - <artifactId>raft-client</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <artifactId>raft-server</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>provided</scope> - </dependency> - <dependency> - <artifactId>raft-server</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java deleted file mode 100644 index 1184e2e..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc; - -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.grpc.client.RaftClientProtocolService; -import org.apache.raft.grpc.server.RaftServerProtocolClient; -import org.apache.raft.grpc.server.RaftServerProtocolService; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.RaftServer; -import org.apache.raft.server.RaftServerRpc; -import org.apache.raft.shaded.io.grpc.Server; -import org.apache.raft.shaded.io.grpc.ServerBuilder; -import org.apache.raft.shaded.io.grpc.netty.NettyServerBuilder; -import org.apache.raft.shaded.proto.RaftProtos.*; -import org.apache.raft.util.CodeInjectionForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT; -import static org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY; - -public class RaftGRpcService implements RaftServerRpc { - static final Logger LOG = LoggerFactory.getLogger(RaftGRpcService.class); - public static final String GRPC_SEND_SERVER_REQUEST = - RaftGRpcService.class.getSimpleName() + ".sendRequest"; - - private final Server server; - private final InetSocketAddress address; - private final Map<String, RaftServerProtocolClient> peers = - Collections.synchronizedMap(new HashMap<>()); - private final String selfId; - - public RaftGRpcService(RaftServer raftServer, RaftProperties properties) { - int port = properties.getInt(RAFT_GRPC_SERVER_PORT_KEY, - RAFT_GRPC_SERVER_PORT_DEFAULT); - int maxMessageSize = properties.getInt( - RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_KEY, - RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT); - ServerBuilder serverBuilder = ServerBuilder.forPort(port); - selfId = raftServer.getId(); - server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize) - .addService(new RaftServerProtocolService(selfId, raftServer)) - .addService(new RaftClientProtocolService(selfId, raftServer)) - .build(); - - // start service to determine the port (in case port is configured as 0) - startService(); - address = new InetSocketAddress(server.getPort()); - LOG.info("Server started, listening on " + address.getPort()); - } - - @Override - public void start() { - // do nothing - } - - private void startService() { - try { - server.start(); - } catch (IOException e) { - LOG.error("Failed to start Grpc server", e); - System.exit(1); - } - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - System.err.println("*** shutting down gRPC server since JVM is shutting down"); - RaftGRpcService.this.close(); - System.err.println("*** server shut down"); - } - }); - } - - @Override - public void close() { - if (server != null) { - server.shutdown(); - } - shutdownClients(); - } - - @Override - public InetSocketAddress getInetSocketAddress() { - return address; - } - - @Override - public AppendEntriesReplyProto appendEntries( - AppendEntriesRequestProto request) throws IOException { - throw new UnsupportedOperationException( - "Blocking AppendEntries call is not supported"); - } - - @Override - public InstallSnapshotReplyProto installSnapshot( - InstallSnapshotRequestProto request) throws IOException { - throw new UnsupportedOperationException( - "Blocking InstallSnapshot call is not supported"); - } - - @Override - public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) - throws IOException { - CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, selfId, - null, request); - - RaftServerProtocolClient target = Preconditions.checkNotNull( - peers.get(request.getServerRequest().getReplyId())); - return target.requestVote(request); - } - - @Override - public void addPeers(Iterable<RaftPeer> newPeers) { - for (RaftPeer p : newPeers) { - if (!peers.containsKey(p.getId())) { - peers.put(p.getId(), new RaftServerProtocolClient(p)); - } - } - } - - private void shutdownClients() { - peers.values().forEach(RaftServerProtocolClient::shutdown); - } - - public RaftServerProtocolClient getRpcClient(RaftPeer peer) { - return peers.get(peer.getId()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcConfigKeys.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcConfigKeys.java b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcConfigKeys.java deleted file mode 100644 index 395848b..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcConfigKeys.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc; - -import org.apache.raft.client.RaftClientConfigKeys; - -public interface RaftGrpcConfigKeys { - String PREFIX = "raft.grpc"; - - String RAFT_GRPC_SERVER_PORT_KEY = PREFIX + ".server.port"; - int RAFT_GRPC_SERVER_PORT_DEFAULT = 0; - - String RAFT_GRPC_MESSAGE_MAXSIZE_KEY = PREFIX + ".message.maxsize"; - int RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT = 64 * 1024 * 1024; // 64 MB - - String RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_KEY = - PREFIX + "leader.max.outstanding.appends"; - int RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_DEFAULT = 128; - - String RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY = - PREFIX + "client.max.outstanding.appends"; - int RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT = 128; - - String RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY = "raft.outputstream.buffer.size"; - int RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT = 64 * 1024; - - String RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_KEY = "raft.outputstream.max.retry.times"; - int RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_DEFAULT = 5; - - String RAFT_OUTPUTSTREAM_RETRY_INTERVAL_KEY = "raft.outputstream.retry.interval"; - long RAFT_OUTPUTSTREAM_RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT; -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcUtil.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcUtil.java b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcUtil.java deleted file mode 100644 index 6afb39b..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcUtil.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc; - -import org.apache.raft.shaded.io.grpc.Metadata; -import org.apache.raft.shaded.io.grpc.Status; -import org.apache.raft.shaded.io.grpc.StatusRuntimeException; -import org.apache.raft.util.RaftUtils; - -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.lang.reflect.Constructor; - -public class RaftGrpcUtil { - public static final Metadata.Key<String> EXCEPTION_TYPE_KEY = - Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER); - - public static String stringifyException(Throwable e) { - StringWriter stm = new StringWriter(); - PrintWriter wrt = new PrintWriter(stm); - e.printStackTrace(wrt); - wrt.close(); - return stm.toString(); - } - - public static StatusRuntimeException wrapException(Throwable t) { - Metadata trailers = new Metadata(); - trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName()); - return new StatusRuntimeException( - Status.INTERNAL.withDescription(RaftGrpcUtil.stringifyException(t)), - trailers); - } - - public static IOException unwrapException(StatusRuntimeException se) { - final Metadata trailers = se.getTrailers(); - final Status status = se.getStatus(); - if (trailers != null && status != null) { - final String className = trailers.get(EXCEPTION_TYPE_KEY); - if (className != null) { - try { - Class<?> clazz = Class.forName(className); - final Exception unwrapped = instantiateException( - clazz.asSubclass(Exception.class), status.getDescription(), se); - return RaftUtils.asIOException(unwrapped); - } catch (Exception e) { - return new IOException(se); - } - } - } - return new IOException(se); - } - - public static IOException unwrapIOException(Throwable t) { - final IOException e; - if (t instanceof StatusRuntimeException) { - e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t); - } else { - e = RaftUtils.asIOException(t); - } - return e; - } - - private static Exception instantiateException(Class<? extends Exception> cls, - String message, Exception from) throws Exception { - Constructor<? extends Exception> cn = cls.getConstructor(String.class); - cn.setAccessible(true); - Exception ex = cn.newInstance(message); - ex.initCause(from); - return ex; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/client/AppendStreamer.java b/raft-grpc/src/main/java/org/apache/raft/grpc/client/AppendStreamer.java deleted file mode 100644 index 9cf8cd5..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/client/AppendStreamer.java +++ /dev/null @@ -1,395 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc.client; - -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.grpc.RaftGrpcConfigKeys; -import org.apache.raft.grpc.RaftGrpcUtil; -import org.apache.raft.protocol.NotLeaderException; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.shaded.com.google.protobuf.ByteString; -import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.raft.shaded.proto.RaftProtos.RaftRpcRequestProto; -import org.apache.raft.util.Daemon; -import org.apache.raft.util.PeerProxyMap; -import org.apache.raft.util.RaftUtils; -import org.apache.raft.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static org.apache.raft.client.impl.ClientProtoUtils.*; -import static org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT; -import static org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY; - -public class AppendStreamer implements Closeable { - public static final Logger LOG = LoggerFactory.getLogger(AppendStreamer.class); - - enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR} - - private static class ExceptionAndRetry { - private final Map<String, IOException> exceptionMap = new HashMap<>(); - private final AtomicInteger retryTimes = new AtomicInteger(0); - private final int maxRetryTimes; - private final long retryInterval; - - ExceptionAndRetry(RaftProperties prop) { - maxRetryTimes = prop.getInt( - RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_KEY, - RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_DEFAULT); - retryInterval = prop.getTimeDuration( - RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_RETRY_INTERVAL_KEY, - RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_RETRY_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); - } - - void addException(String peer, IOException e) { - exceptionMap.put(peer, e); - retryTimes.incrementAndGet(); - } - - IOException getCombinedException() { - return new IOException("Exceptions: " + exceptionMap); - } - - boolean shouldRetry() { - return retryTimes.get() <= maxRetryTimes; - } - } - - private final Deque<RaftClientRequestProto> dataQueue; - private final Deque<RaftClientRequestProto> ackQueue; - private final int maxPendingNum; - - private final PeerProxyMap<RaftClientProtocolProxy> proxyMap; - private final Map<String, RaftPeer> peers; - private String leaderId; - private volatile RaftClientProtocolProxy leaderProxy; - private final String clientId; - - private volatile RunningState running = RunningState.RUNNING; - private final ExceptionAndRetry exceptionAndRetry; - private final Sender senderThread; - - AppendStreamer(RaftProperties prop, Collection<RaftPeer> peers, - String leaderId, String clientId) { - this.clientId = clientId; - maxPendingNum = prop.getInt( - RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY, - RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT); - dataQueue = new ConcurrentLinkedDeque<>(); - ackQueue = new ConcurrentLinkedDeque<>(); - exceptionAndRetry = new ExceptionAndRetry(prop); - - this.peers = peers.stream().collect( - Collectors.toMap(RaftPeer::getId, Function.identity())); - proxyMap = new PeerProxyMap<>( - raftPeer -> new RaftClientProtocolProxy(raftPeer, ResponseHandler::new)); - proxyMap.addPeers(peers); - refreshLeaderProxy(leaderId, null); - - senderThread = new Sender(); - senderThread.setName(this.toString() + "-sender"); - senderThread.start(); - } - - private synchronized void refreshLeaderProxy(String suggested, - String oldLeader) { - if (suggested != null) { - leaderId = suggested; - } else { - if (oldLeader == null) { - leaderId = peers.keySet().iterator().next(); - } else { - leaderId = StringUtils.next(oldLeader, peers.keySet()); - } - } - LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this, - oldLeader, leaderId, suggested); - if (leaderProxy != null) { - leaderProxy.closeCurrentSession(); - } - try { - leaderProxy = proxyMap.getProxy(leaderId); - } catch (IOException e) { - LOG.error("Should not hit IOException here", e); - refreshLeader(null, leaderId); - } - } - - private boolean isRunning() { - return running == RunningState.RUNNING || - running == RunningState.LOOK_FOR_LEADER; - } - - private void checkState() throws IOException { - if (!isRunning()) { - throwException("The AppendStreamer has been closed"); - } - } - - synchronized void write(ByteString content, long seqNum) - throws IOException { - checkState(); - while (isRunning() && dataQueue.size() >= maxPendingNum) { - try { - wait(); - } catch (InterruptedException ignored) { - } - } - if (isRunning()) { - // wrap the current buffer into a RaftClientRequestProto - final RaftClientRequestProto request = genRaftClientRequestProto( - clientId, leaderId, seqNum, content, false); - dataQueue.offer(request); - this.notifyAll(); - } else { - throwException(this + " got closed."); - } - } - - synchronized void flush() throws IOException { - checkState(); - if (dataQueue.isEmpty() && ackQueue.isEmpty()) { - return; - } - // wait for the pending Q to become empty - while (isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) { - try { - wait(); - } catch (InterruptedException ignored) { - } - } - if (!isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) { - throwException(this + " got closed before finishing flush"); - } - } - - @Override - public void close() throws IOException { - if (!isRunning()) { - return; - } - flush(); - - running = RunningState.CLOSED; - senderThread.interrupt(); - try { - senderThread.join(); - } catch (InterruptedException ignored) { - } - proxyMap.close(); - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + "-" + clientId; - } - - private class Sender extends Daemon { - @Override - public void run() { - while (isRunning()) { - - synchronized (AppendStreamer.this) { - while (isRunning() && shouldWait()) { - try { - AppendStreamer.this.wait(); - } catch (InterruptedException ignored) { - } - } - if (running == RunningState.RUNNING) { - RaftClientRequestProto next = dataQueue.poll(); - leaderProxy.onNext(next); - ackQueue.offer(next); - } - } - } - } - - private boolean shouldWait() { - // the sender should wait if any of the following is true - // 1) there is no data to send - // 2) there are too many outstanding pending requests - // 3) Error/NotLeaderException just happened, we're still waiting for - // the first response to confirm the new leader - return dataQueue.isEmpty() || ackQueue.size() >= maxPendingNum || - running == RunningState.LOOK_FOR_LEADER; - } - } - - /** the response handler for stream RPC */ - private class ResponseHandler implements - RaftClientProtocolProxy.CloseableStreamObserver { - private final String targetId; - // once handled the first NotLeaderException or Error, the handler should - // be inactive and should not make any further action. - private volatile boolean active = true; - - ResponseHandler(RaftPeer target) { - targetId = target.getId(); - } - - @Override - public String toString() { - return AppendStreamer.this + "-ResponseHandler-" + targetId; - } - - @Override - public void onNext(RaftClientReplyProto reply) { - if (!active) { - return; - } - synchronized (AppendStreamer.this) { - RaftClientRequestProto pending = Preconditions.checkNotNull( - ackQueue.peek()); - if (reply.getRpcReply().getSuccess()) { - Preconditions.checkState(pending.getRpcRequest().getSeqNum() == - reply.getRpcReply().getSeqNum()); - ackQueue.poll(); - LOG.trace("{} received success ack for request {}", this, - pending.getRpcRequest()); - // we've identified the correct leader - if (running == RunningState.LOOK_FOR_LEADER) { - running = RunningState.RUNNING; - } - } else { - // this may be a NotLeaderException - RaftClientReply r = toRaftClientReply(reply); - if (r.isNotLeader()) { - LOG.debug("{} received a NotLeaderException from {}", this, - r.getReplierId()); - handleNotLeader(r.getNotLeaderException(), targetId); - } - } - AppendStreamer.this.notifyAll(); - } - } - - @Override - public void onError(Throwable t) { - if (active) { - synchronized (AppendStreamer.this) { - handleError(t, this); - AppendStreamer.this.notifyAll(); - } - } - } - - @Override - public void onCompleted() { - LOG.info("{} onCompleted, pending requests #: {}", this, - ackQueue.size()); - } - - @Override // called by handleError and handleNotLeader - public void close() throws IOException { - active = false; - } - } - - private void throwException(String msg) throws IOException { - if (running == RunningState.ERROR) { - throw exceptionAndRetry.getCombinedException(); - } else { - throw new IOException(msg); - } - } - - private void handleNotLeader(NotLeaderException nle, - String oldLeader) { - Preconditions.checkState(Thread.holdsLock(AppendStreamer.this)); - // handle NotLeaderException: refresh leader and RaftConfiguration - refreshPeers(nle.getPeers()); - - refreshLeader(nle.getSuggestedLeader().getId(), oldLeader); - } - - private void handleError(Throwable t, ResponseHandler handler) { - Preconditions.checkState(Thread.holdsLock(AppendStreamer.this)); - final IOException e = RaftGrpcUtil.unwrapIOException(t); - - exceptionAndRetry.addException(handler.targetId, e); - LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.", - handler, e, exceptionAndRetry.retryTimes.get(), - exceptionAndRetry.maxRetryTimes); - - leaderProxy.onError(); - if (exceptionAndRetry.shouldRetry()) { - refreshLeader(null, leaderId); - } else { - running = RunningState.ERROR; - } - } - - private void refreshLeader(String suggestedLeader, String oldLeader) { - running = RunningState.LOOK_FOR_LEADER; - refreshLeaderProxy(suggestedLeader, oldLeader); - reQueuePendingRequests(leaderId); - - final RaftClientRequestProto request = Preconditions.checkNotNull( - dataQueue.poll()); - ackQueue.offer(request); - try { - Thread.sleep(exceptionAndRetry.retryInterval); - } catch (InterruptedException ignored) { - } - leaderProxy.onNext(request); - } - - private void reQueuePendingRequests(String newLeader) { - if (isRunning()) { - // resend all the pending requests - while (!ackQueue.isEmpty()) { - RaftClientRequestProto oldRequest = ackQueue.pollLast(); - RaftRpcRequestProto r = oldRequest.getRpcRequest(); - RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder() - .setMessage(oldRequest.getMessage()) - .setReadOnly(oldRequest.getReadOnly()) - .setRpcRequest(toRaftRpcRequestProtoBuilder( - clientId, newLeader, r.getSeqNum())) - .build(); - dataQueue.offerFirst(newRequest); - } - } - } - - private void refreshPeers(RaftPeer[] newPeers) { - if (newPeers != null && newPeers.length > 0) { - // we only add new peers, we do not remove any peer even if it no longer - // belongs to the current raft conf - Arrays.stream(newPeers).forEach(peer -> { - peers.putIfAbsent(peer.getId(), peer); - proxyMap.putIfAbsent(peer); - }); - - LOG.debug("refreshed peers: {}", peers); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolClient.java b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolClient.java deleted file mode 100644 index a8372a3..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolClient.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc.client; - -import org.apache.raft.grpc.RaftGrpcUtil; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.shaded.io.grpc.ManagedChannel; -import org.apache.raft.shaded.io.grpc.ManagedChannelBuilder; -import org.apache.raft.shaded.io.grpc.StatusRuntimeException; -import org.apache.raft.shaded.io.grpc.stub.StreamObserver; -import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.raft.shaded.proto.RaftProtos.SetConfigurationRequestProto; -import org.apache.raft.shaded.proto.grpc.RaftClientProtocolServiceGrpc; -import org.apache.raft.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub; -import org.apache.raft.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub; - -import java.io.Closeable; -import java.io.IOException; - -public class RaftClientProtocolClient implements Closeable { - private final RaftPeer target; - private final ManagedChannel channel; - private final RaftClientProtocolServiceBlockingStub blockingStub; - private final RaftClientProtocolServiceStub asyncStub; - - public RaftClientProtocolClient(RaftPeer target) { - this.target = target; - channel = ManagedChannelBuilder.forTarget(target.getAddress()) - .usePlaintext(true).build(); - blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel); - asyncStub = RaftClientProtocolServiceGrpc.newStub(channel); - } - - @Override - public void close() { - channel.shutdownNow(); - } - - public RaftClientReplyProto setConfiguration( - SetConfigurationRequestProto request) throws IOException { - try { - return blockingStub.setConfiguration(request); - } catch (StatusRuntimeException e) { - // unwrap StatusRuntimeException - throw RaftGrpcUtil.unwrapException(e); - } - } - - StreamObserver<RaftClientRequestProto> append( - StreamObserver<RaftClientReplyProto> responseHandler) { - return asyncStub.append(responseHandler); - } - - public RaftPeer getTarget() { - return target; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolProxy.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolProxy.java b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolProxy.java deleted file mode 100644 index 01ec023..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolProxy.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc.client; - -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.shaded.io.grpc.stub.StreamObserver; -import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto; - -import java.io.Closeable; -import java.io.IOException; -import java.util.function.Function; - -public class RaftClientProtocolProxy implements Closeable { - private final RaftClientProtocolClient proxy; - private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation; - private RpcSession currentSession; - - public RaftClientProtocolProxy(RaftPeer target, - Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation) { - proxy = new RaftClientProtocolClient(target); - this.responseHandlerCreation = responseHandlerCreation; - } - - @Override - public void close() throws IOException { - closeCurrentSession(); - proxy.close(); - } - - @Override - public String toString() { - return "ProxyTo:" + proxy.getTarget(); - } - - public void closeCurrentSession() { - if (currentSession != null) { - currentSession.close(); - currentSession = null; - } - } - - public void onNext(RaftClientRequestProto request) { - if (currentSession == null) { - currentSession = new RpcSession( - responseHandlerCreation.apply(proxy.getTarget())); - } - currentSession.requestObserver.onNext(request); - } - - public void onError() { - if (currentSession != null) { - currentSession.onError(); - } - } - - public interface CloseableStreamObserver - extends StreamObserver<RaftClientReplyProto>, Closeable { - } - - class RpcSession implements Closeable { - private final StreamObserver<RaftClientRequestProto> requestObserver; - private final CloseableStreamObserver responseHandler; - private boolean hasError = false; - - RpcSession(CloseableStreamObserver responseHandler) { - this.responseHandler = responseHandler; - this.requestObserver = proxy.append(responseHandler); - } - - void onError() { - hasError = true; - } - - @Override - public void close() { - if (!hasError) { - try { - requestObserver.onCompleted(); - } catch (Exception ignored) { - } - } - try { - responseHandler.close(); - } catch (IOException ignored) { - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java deleted file mode 100644 index 8f41bdc..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc.client; - -import com.google.common.base.Preconditions; -import org.apache.raft.client.impl.ClientProtoUtils; -import org.apache.raft.grpc.RaftGrpcUtil; -import org.apache.raft.protocol.RaftClientAsynchronousProtocol; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.shaded.io.grpc.stub.StreamObserver; -import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.raft.shaded.proto.RaftProtos.SetConfigurationRequestProto; -import org.apache.raft.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.CompletableFuture; - -public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase { - static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class); - - private static class PendingAppend implements Comparable<PendingAppend> { - private final long seqNum; - private volatile RaftClientReply reply; - - PendingAppend(long seqNum) { - this.seqNum = seqNum; - } - - boolean isReady() { - return reply != null || this == COMPLETED; - } - - void setReply(RaftClientReply reply) { - this.reply = reply; - } - - @Override - public int compareTo(PendingAppend p) { - return seqNum == p.seqNum ? 0 : (seqNum < p.seqNum ? -1 : 1); - } - - @Override - public String toString() { - return seqNum + ", reply:" + (reply == null ? "null" : reply.toString()); - } - } - private static final PendingAppend COMPLETED = new PendingAppend(Long.MAX_VALUE); - - private final String id; - private final RaftClientAsynchronousProtocol client; - - public RaftClientProtocolService(String id, RaftClientAsynchronousProtocol client) { - this.id = id; - this.client = client; - } - - @Override - public void setConfiguration(SetConfigurationRequestProto request, - StreamObserver<RaftClientReplyProto> responseObserver) { - try { - CompletableFuture<RaftClientReply> future = client.setConfigurationAsync( - ClientProtoUtils.toSetConfigurationRequest(request)); - future.whenCompleteAsync((reply, exception) -> { - if (exception != null) { - responseObserver.onError(RaftGrpcUtil.wrapException(exception)); - } else { - responseObserver.onNext(ClientProtoUtils.toRaftClientReplyProto(reply)); - responseObserver.onCompleted(); - } - }); - } catch (Exception e) { - responseObserver.onError(RaftGrpcUtil.wrapException(e)); - } - } - - @Override - public StreamObserver<RaftClientRequestProto> append( - StreamObserver<RaftClientReplyProto> responseObserver) { - return new AppendRequestStreamObserver(responseObserver); - } - - private class AppendRequestStreamObserver implements - StreamObserver<RaftClientRequestProto> { - private final List<PendingAppend> pendingList = new LinkedList<>(); - private final StreamObserver<RaftClientReplyProto> responseObserver; - - AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) { - this.responseObserver = ro; - } - - @Override - public void onNext(RaftClientRequestProto request) { - try { - PendingAppend p = new PendingAppend(request.getRpcRequest().getSeqNum()); - synchronized (pendingList) { - pendingList.add(p); - } - - CompletableFuture<RaftClientReply> future = client.submitClientRequestAsync( - ClientProtoUtils.toRaftClientRequest(request)); - future.whenCompleteAsync((reply, exception) -> { - if (exception != null) { - // TODO: the exception may be from either raft or state machine. - // Currently we skip all the following responses when getting an - // exception from the state machine. - responseObserver.onError(RaftGrpcUtil.wrapException(exception)); - } else { - final long replySeq = reply.getSeqNum(); - synchronized (pendingList) { - Preconditions.checkState(!pendingList.isEmpty(), - "PendingList is empty when handling onNext for seqNum %s", - replySeq); - final long headSeqNum = pendingList.get(0).seqNum; - // we assume the seqNum is consecutive for a stream RPC call - final PendingAppend pendingForReply = pendingList.get( - (int) (replySeq - headSeqNum)); - Preconditions.checkState(pendingForReply != null && - pendingForReply.seqNum == replySeq, - "pending for reply is: %s, the pending list: %s", - pendingForReply, pendingList); - pendingForReply.setReply(reply); - - if (headSeqNum == replySeq) { - Collection<PendingAppend> readySet = new ArrayList<>(); - // if this is head, we send back all the ready responses - Iterator<PendingAppend> iter = pendingList.iterator(); - PendingAppend pending; - while (iter.hasNext() && ((pending = iter.next()).isReady())) { - readySet.add(pending); - iter.remove(); - } - sendReadyReplies(readySet); - } - } - } - }); - } catch (Throwable e) { - LOG.info("{} got exception when handling client append request {}: {}", - id, request.getRpcRequest(), e); - responseObserver.onError(RaftGrpcUtil.wrapException(e)); - } - } - - private void sendReadyReplies(Collection<PendingAppend> readySet) { - readySet.forEach(ready -> { - Preconditions.checkState(ready.isReady()); - if (ready == COMPLETED) { - responseObserver.onCompleted(); - } else { - responseObserver.onNext( - ClientProtoUtils.toRaftClientReplyProto(ready.reply)); - } - }); - } - - @Override - public void onError(Throwable t) { - // for now we just log a msg - LOG.warn("{} onError: client Append cancelled", id, t); - synchronized (pendingList) { - pendingList.clear(); - } - } - - @Override - public void onCompleted() { - synchronized (pendingList) { - if (pendingList.isEmpty()) { - responseObserver.onCompleted(); - } else { - pendingList.add(COMPLETED); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientSenderWithGrpc.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientSenderWithGrpc.java b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientSenderWithGrpc.java deleted file mode 100644 index 7351e1a..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientSenderWithGrpc.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc.client; - -import org.apache.raft.client.RaftClientRequestSender; -import org.apache.raft.grpc.RaftGrpcUtil; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.protocol.SetConfigurationRequest; -import org.apache.raft.shaded.io.grpc.StatusRuntimeException; -import org.apache.raft.shaded.io.grpc.stub.StreamObserver; -import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.raft.shaded.proto.RaftProtos.SetConfigurationRequestProto; -import org.apache.raft.util.PeerProxyMap; -import org.apache.raft.util.RaftUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.Collection; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - -import static org.apache.raft.client.impl.ClientProtoUtils.*; - -public class RaftClientSenderWithGrpc implements RaftClientRequestSender { - public static final Logger LOG = LoggerFactory.getLogger(RaftClientSenderWithGrpc.class); - - private final PeerProxyMap<RaftClientProtocolClient> proxies - = new PeerProxyMap<>(RaftClientProtocolClient::new); - - public RaftClientSenderWithGrpc(Collection<RaftPeer> peers) { - addServers(peers); - } - - @Override - public RaftClientReply sendRequest(RaftClientRequest request) - throws IOException { - final String serverId = request.getReplierId(); - final RaftClientProtocolClient proxy = proxies.getProxy(serverId); - if (request instanceof SetConfigurationRequest) { - SetConfigurationRequestProto setConf = - toSetConfigurationRequestProto((SetConfigurationRequest) request); - return toRaftClientReply(proxy.setConfiguration(setConf)); - } else { - RaftClientRequestProto requestProto = toRaftClientRequestProto(request); - CompletableFuture<RaftClientReplyProto> replyFuture = - new CompletableFuture<>(); - final StreamObserver<RaftClientRequestProto> requestObserver = - proxy.append(new StreamObserver<RaftClientReplyProto>() { - @Override - public void onNext(RaftClientReplyProto value) { - replyFuture.complete(value); - } - - @Override - public void onError(Throwable t) { - // This implementation is used as RaftClientRequestSender. Retry - // logic on Exception is in RaftClient. - final IOException e; - if (t instanceof StatusRuntimeException) { - e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t); - } else { - e = RaftUtils.asIOException(t); - } - replyFuture.completeExceptionally(e); - } - - @Override - public void onCompleted() { - if (!replyFuture.isDone()) { - replyFuture.completeExceptionally( - new IOException("No reply for request " + request)); - } - } - }); - requestObserver.onNext(requestProto); - requestObserver.onCompleted(); - - // TODO: timeout support - try { - return toRaftClientReply(replyFuture.get()); - } catch (InterruptedException e) { - throw new InterruptedIOException( - "Interrupted while waiting for response of request " + request); - } catch (ExecutionException e) { - throw RaftUtils.toIOException(e); - } - } - } - - @Override - public void addServers(Iterable<RaftPeer> servers) { - proxies.addPeers(servers); - } - - @Override - public void close() throws IOException { - proxies.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftOutputStream.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftOutputStream.java b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftOutputStream.java deleted file mode 100644 index 7edcab9..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftOutputStream.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc.client; - -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.util.ProtoUtils; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Collection; - -import static org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT; -import static org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY; - -public class RaftOutputStream extends OutputStream { - /** internal buffer */ - private final byte buf[]; - private int count; - private long seqNum = 0; - private final String clientId; - private final AppendStreamer streamer; - - private boolean closed = false; - - public RaftOutputStream(RaftProperties prop, String clientId, - Collection<RaftPeer> peers, String leaderId) { - final int bufferSize = prop.getInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, - RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT); - buf = new byte[bufferSize]; - count = 0; - this.clientId = clientId; - streamer = new AppendStreamer(prop, peers, leaderId, clientId); - } - - @Override - public void write(int b) throws IOException { - checkClosed(); - buf[count++] = (byte)b; - flushIfNecessary(); - } - - private void flushIfNecessary() throws IOException { - if(count == buf.length) { - flushToStreamer(); - } - } - - @Override - public void write(byte b[], int off, int len) throws IOException { - checkClosed(); - if (off < 0 || len < 0 || off > b.length - len) { - throw new ArrayIndexOutOfBoundsException(); - } - - int total = 0; - while (total < len) { - int toWrite = Math.min(len - total, buf.length - count); - System.arraycopy(b, off + total, buf, count, toWrite); - count += toWrite; - total += toWrite; - flushIfNecessary(); - } - } - - private void flushToStreamer() throws IOException { - if (count > 0) { - streamer.write(ProtoUtils.toByteString(buf, 0, count), seqNum++); - count = 0; - } - } - - @Override - public void flush() throws IOException { - checkClosed(); - flushToStreamer(); - streamer.flush(); - } - - @Override - public void close() throws IOException { - flushToStreamer(); - streamer.close(); // streamer will flush - this.closed = true; - } - - @Override - public String toString() { - return "RaftOutputStream-" + clientId; - } - - private void checkClosed() throws IOException { - if (closed) { - throw new IOException(this.toString() + " was closed."); - } - } -}
