http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java new file mode 100644 index 0000000..5f1d901 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java @@ -0,0 +1,416 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.server; + +import com.google.common.base.Preconditions; + +import org.apache.ratis.shaded.io.grpc.Status; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.grpc.RaftGRpcService; +import org.apache.ratis.grpc.RaftGrpcConfigKeys; +import org.apache.ratis.server.impl.FollowerInfo; +import org.apache.ratis.server.impl.LeaderState; +import org.apache.ratis.server.impl.LogAppender; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.util.CodeInjectionForTesting; + +import static org.apache.ratis.grpc.RaftGRpcService.GRPC_SEND_SERVER_REQUEST; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A new log appender implementation using grpc bi-directional stream API. + */ +public class GRpcLogAppender extends LogAppender { + private final RaftServerProtocolClient client; + private final Queue<AppendEntriesRequestProto> pendingRequests; + private final int maxPendingRequestsNum; + private volatile boolean firstResponseReceived = false; + + private final AppendLogResponseHandler appendResponseHandler; + private final InstallSnapshotResponseHandler snapshotResponseHandler; + + private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver; + private StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver; + + public GRpcLogAppender(RaftServerImpl server, LeaderState leaderState, + FollowerInfo f) { + super(server, leaderState, f); + + RaftGRpcService rpcService = (RaftGRpcService) server.getServerRpc(); + client = rpcService.getRpcClient(f.getPeer()); + maxPendingRequestsNum = server.getProperties().getInt( + RaftGrpcConfigKeys.RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_KEY, + RaftGrpcConfigKeys.RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_DEFAULT); + pendingRequests = new ConcurrentLinkedQueue<>(); + + appendResponseHandler = new AppendLogResponseHandler(); + snapshotResponseHandler = new InstallSnapshotResponseHandler(); + } + + @Override + public void run() { + while (isAppenderRunning()) { + if (shouldSendRequest()) { + SnapshotInfo snapshot = shouldInstallSnapshot(); + if (snapshot != null) { + installSnapshot(snapshot, snapshotResponseHandler); + } else { + // keep appending log entries or sending heartbeats + appendLog(); + } + } + + if (isAppenderRunning() && !shouldSendRequest()) { + // use lastSend time instead of lastResponse time + final long waitTime = getHeartbeatRemainingTime( + follower.getLastRpcTime()); + if (waitTime > 0) { + synchronized (this) { + try { + LOG.debug("{} decides to wait {}ms before appending to {}", + server.getId(), waitTime, follower.getPeer()); + wait(waitTime); + } catch (InterruptedException ignored) { + } + } + } + } + } + appendLogRequestObserver.onCompleted(); + } + + private boolean shouldWait() { + return pendingRequests.size() >= maxPendingRequestsNum || + shouldWaitForFirstResponse(); + } + + private void appendLog() { + if (appendLogRequestObserver == null) { + appendLogRequestObserver = client.appendEntries(appendResponseHandler); + } + AppendEntriesRequestProto pending = null; + final StreamObserver<AppendEntriesRequestProto> s; + synchronized (this) { + // if the queue's size >= maxSize, wait + while (isAppenderRunning() && shouldWait()) { + try { + LOG.debug("{} wait to send the next AppendEntries to {}", + server.getId(), follower.getPeer()); + this.wait(); + } catch (InterruptedException ignored) { + } + } + + if (isAppenderRunning()) { + // prepare and enqueue the append request. note changes on follower's + // nextIndex and ops on pendingRequests should always be associated + // together and protected by the lock + pending = createRequest(); + if (pending != null) { + Preconditions.checkState(pendingRequests.offer(pending)); + updateNextIndex(pending); + } + } + s = appendLogRequestObserver; + } + + if (pending != null && isAppenderRunning()) { + sendRequest(pending, s); + } + } + + private void sendRequest(AppendEntriesRequestProto request, + StreamObserver<AppendEntriesRequestProto> s) { + CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, server.getId(), + null, request); + + s.onNext(request); + follower.updateLastRpcSendTime(); + } + + private void updateNextIndex(AppendEntriesRequestProto request) { + final int count = request.getEntriesCount(); + if (count > 0) { + follower.updateNextIndex(request.getEntries(count - 1).getIndex() + 1); + } + } + + /** + * if this is the first append, wait for the response of the first append so + * that we can get the correct next index. + */ + private boolean shouldWaitForFirstResponse() { + return pendingRequests.size() > 0 && !firstResponseReceived; + } + + /** + * StreamObserver for handling responses from the follower + */ + private class AppendLogResponseHandler + implements StreamObserver<AppendEntriesReplyProto> { + /** + * After receiving a appendEntries reply, do the following: + * 1. If the reply is success, update the follower's match index and submit + * an event to leaderState + * 2. If the reply is NOT_LEADER, step down + * 3. If the reply is INCONSISTENCY, decrease the follower's next index + * based on the response + */ + @Override + public void onNext(AppendEntriesReplyProto reply) { + LOG.debug("{} received {} response from {}", server.getId(), + (!firstResponseReceived ? "the first" : "a"), + follower.getPeer()); + + // update the last rpc time + follower.updateLastRpcResponseTime(); + + if (!firstResponseReceived) { + firstResponseReceived = true; + } + switch (reply.getResult()) { + case SUCCESS: + onSuccess(reply); + break; + case NOT_LEADER: + onNotLeader(reply); + break; + case INCONSISTENCY: + onInconsistency(reply); + break; + default: + break; + } + notifyAppend(); + } + + /** + * for now we simply retry the first pending request + */ + @Override + public void onError(Throwable t) { + if (!isAppenderRunning()) { + LOG.info("{} is stopped", GRpcLogAppender.this); + return; + } + LOG.warn("{} got error when appending entries to {}, exception: {}.", + server.getId(), follower.getPeer().getId(), t); + + synchronized (this) { + final Status cause = Status.fromThrowable(t); + if (cause != null && cause.getCode() == Status.Code.INTERNAL) { + // TODO check other Status. Add sleep to avoid tight loop + LOG.debug("{} restarts Append call to {} due to error {}", + server.getId(), follower.getPeer(), t); + // recreate the StreamObserver + appendLogRequestObserver = client.appendEntries(appendResponseHandler); + // reset firstResponseReceived to false + firstResponseReceived = false; + } + + // clear the pending requests queue and reset the next index of follower + AppendEntriesRequestProto request = pendingRequests.peek(); + if (request != null) { + final long nextIndex = request.hasPreviousLog() ? + request.getPreviousLog().getIndex() + 1 : raftLog.getStartIndex(); + clearPendingRequests(nextIndex); + } + } + } + + @Override + public void onCompleted() { + LOG.info("{} stops appending log entries to follower {}", server.getId(), + follower); + } + } + + private void clearPendingRequests(long newNextIndex) { + pendingRequests.clear(); + follower.decreaseNextIndex(newNextIndex); + } + + private void onSuccess(AppendEntriesReplyProto reply) { + AppendEntriesRequestProto request = pendingRequests.poll(); + final long replyNextIndex = reply.getNextIndex(); + Preconditions.checkNotNull(request, + "Got reply with next index %s but the pending queue is empty", + replyNextIndex); + + if (request.getEntriesCount() == 0) { + Preconditions.checkState(!request.hasPreviousLog() || + replyNextIndex - 1 == request.getPreviousLog().getIndex(), + "reply's next index is %s, request's previous is %s", + replyNextIndex, request.getPreviousLog()); + } else { + // check if the reply and the pending request is consistent + final long lastEntryIndex = request + .getEntries(request.getEntriesCount() - 1).getIndex(); + Preconditions.checkState(replyNextIndex == lastEntryIndex + 1, + "reply's next index is %s, request's last entry index is %s", + replyNextIndex, lastEntryIndex); + follower.updateMatchIndex(lastEntryIndex); + submitEventOnSuccessAppend(); + } + } + + private void onNotLeader(AppendEntriesReplyProto reply) { + checkResponseTerm(reply.getTerm()); + // the running loop will end and the connection will onComplete + } + + private synchronized void onInconsistency(AppendEntriesReplyProto reply) { + AppendEntriesRequestProto request = pendingRequests.peek(); + Preconditions.checkState(request.hasPreviousLog()); + if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) { + clearPendingRequests(reply.getNextIndex()); + } + } + + private class InstallSnapshotResponseHandler + implements StreamObserver<InstallSnapshotReplyProto> { + private final Queue<Integer> pending; + private final AtomicBoolean done = new AtomicBoolean(false); + + InstallSnapshotResponseHandler() { + pending = new LinkedList<>(); + } + + synchronized void addPending(InstallSnapshotRequestProto request) { + pending.offer(request.getRequestIndex()); + } + + synchronized void removePending(InstallSnapshotReplyProto reply) { + int index = pending.poll(); + Preconditions.checkState(index == reply.getRequestIndex()); + } + + boolean isDone() { + return done.get(); + } + + void close() { + done.set(true); + GRpcLogAppender.this.notifyAppend(); + } + + synchronized boolean hasAllResponse() { + return pending.isEmpty(); + } + + @Override + public void onNext(InstallSnapshotReplyProto reply) { + LOG.debug("{} received {} response from {}", server.getId(), + (!firstResponseReceived ? "the first" : "a"), + follower.getPeer()); + + // update the last rpc time + follower.updateLastRpcResponseTime(); + + if (!firstResponseReceived) { + firstResponseReceived = true; + } + + switch (reply.getResult()) { + case SUCCESS: + removePending(reply); + break; + case NOT_LEADER: + checkResponseTerm(reply.getTerm()); + break; + case UNRECOGNIZED: + break; + } + } + + @Override + public void onError(Throwable t) { + if (!isAppenderRunning()) { + LOG.info("{} is stopped", GRpcLogAppender.this); + return; + } + LOG.info("{} got error when installing snapshot to {}, exception: {}", + server.getId(), follower.getPeer(), t); + close(); + } + + @Override + public void onCompleted() { + LOG.info("{} stops sending snapshots to follower {}", server.getId(), + follower); + close(); + } + } + + private void installSnapshot(SnapshotInfo snapshot, + InstallSnapshotResponseHandler responseHandler) { + LOG.info("{}: follower {}'s next index is {}," + + " log's start index is {}, need to install snapshot", + server.getId(), follower.getPeer(), follower.getNextIndex(), + raftLog.getStartIndex()); + + snapshotRequestObserver = client.installSnapshot(snapshotResponseHandler); + final String requestId = UUID.randomUUID().toString(); + try { + for (InstallSnapshotRequestProto request : + new SnapshotRequestIter(snapshot, requestId)) { + if (isAppenderRunning()) { + snapshotRequestObserver.onNext(request); + follower.updateLastRpcSendTime(); + responseHandler.addPending(request); + } else { + break; + } + } + snapshotRequestObserver.onCompleted(); + } catch (Exception e) { + LOG.warn("{} failed to install snapshot {}. Exception: {}", this, + snapshot.getFiles(), e); + snapshotRequestObserver.onError(e); + return; + } finally { + snapshotRequestObserver = null; + } + + synchronized (this) { + while (isAppenderRunning() && !responseHandler.isDone()) { + try { + wait(); + } catch (InterruptedException ignored) { + } + } + } + + if (responseHandler.hasAllResponse()) { + follower.updateMatchIndex(snapshot.getTermIndex().getIndex()); + follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1); + LOG.info("{}: install snapshot-{} successfully on follower {}", + server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer()); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java new file mode 100644 index 0000000..d30b391 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.server; + +import org.apache.ratis.server.impl.FollowerInfo; +import org.apache.ratis.server.impl.LeaderState; +import org.apache.ratis.server.impl.LogAppender; +import org.apache.ratis.server.impl.LogAppenderFactory; +import org.apache.ratis.server.impl.RaftServerImpl; + +public class PipelinedLogAppenderFactory implements LogAppenderFactory { + @Override + public LogAppender getLogAppender(RaftServerImpl server, LeaderState state, + FollowerInfo f) { + return new GRpcLogAppender(server, state, f); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java new file mode 100644 index 0000000..53df10d --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.server; + +import org.apache.ratis.shaded.io.grpc.ManagedChannel; +import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.*; +import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc; +import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub; +import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub; +import org.apache.ratis.protocol.RaftPeer; + +/** + * This is a RaftClient implementation that supports streaming data to the raft + * ring. The stream implementation utilizes gRPC. + */ +public class RaftServerProtocolClient { + private final ManagedChannel channel; + private final RaftServerProtocolServiceBlockingStub blockingStub; + private final RaftServerProtocolServiceStub asyncStub; + + public RaftServerProtocolClient(RaftPeer target) { + channel = ManagedChannelBuilder.forTarget(target.getAddress()) + .usePlaintext(true).build(); + blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel); + asyncStub = RaftServerProtocolServiceGrpc.newStub(channel); + } + + public void shutdown() { + channel.shutdownNow(); + } + + public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) { + // the StatusRuntimeException will be handled by the caller + return blockingStub.requestVote(request); + } + + StreamObserver<AppendEntriesRequestProto> appendEntries( + StreamObserver<AppendEntriesReplyProto> responseHandler) { + return asyncStub.appendEntries(responseHandler); + } + + StreamObserver<InstallSnapshotRequestProto> installSnapshot( + StreamObserver<InstallSnapshotReplyProto> responseHandler) { + return asyncStub.installSnapshot(responseHandler); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java new file mode 100644 index 0000000..08e6a51 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.server; + +import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.server.protocol.RaftServerProtocol; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase { + public static final Logger LOG = LoggerFactory.getLogger(RaftServerProtocolService.class); + + private final String id; + private final RaftServerProtocol server; + + public RaftServerProtocolService(String id, RaftServerProtocol server) { + this.id = id; + this.server = server; + } + + @Override + public void requestVote(RequestVoteRequestProto request, + StreamObserver<RequestVoteReplyProto> responseObserver) { + try { + final RequestVoteReplyProto reply = server.requestVote(request); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (Throwable e) { + LOG.info("{} got exception when handling requestVote {}: {}", + id, request.getServerRequest(), e); + responseObserver.onError(RaftGrpcUtil.wrapException(e)); + } + } + + @Override + public StreamObserver<AppendEntriesRequestProto> appendEntries( + StreamObserver<AppendEntriesReplyProto> responseObserver) { + return new StreamObserver<AppendEntriesRequestProto>() { + @Override + public void onNext(AppendEntriesRequestProto request) { + try { + final AppendEntriesReplyProto reply = server.appendEntries(request); + responseObserver.onNext(reply); + } catch (Throwable e) { + LOG.info("{} got exception when handling appendEntries {}: {}", + id, request.getServerRequest(), e); + responseObserver.onError(RaftGrpcUtil.wrapException(e)); + } + } + + @Override + public void onError(Throwable t) { + // for now we just log a msg + LOG.info("{}: appendEntries on error. Exception: {}", id, t); + } + + @Override + public void onCompleted() { + LOG.info("{}: appendEntries completed", id); + responseObserver.onCompleted(); + } + }; + } + + @Override + public StreamObserver<InstallSnapshotRequestProto> installSnapshot( + StreamObserver<InstallSnapshotReplyProto> responseObserver) { + return new StreamObserver<InstallSnapshotRequestProto>() { + @Override + public void onNext(InstallSnapshotRequestProto request) { + try { + final InstallSnapshotReplyProto reply = server.installSnapshot(request); + responseObserver.onNext(reply); + } catch (Throwable e) { + LOG.info("{} got exception when handling installSnapshot {}: {}", + id, request.getServerRequest(), e); + responseObserver.onError(RaftGrpcUtil.wrapException(e)); + } + } + + @Override + public void onError(Throwable t) { + LOG.info("{}: installSnapshot on error. Exception: {}", id, t); + } + + @Override + public void onCompleted() { + LOG.info("{}: installSnapshot completed", id); + responseObserver.onCompleted(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java new file mode 100644 index 0000000..f5c7b3f --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import com.google.common.base.Preconditions; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.RaftGRpcService; +import org.apache.ratis.grpc.RaftGrpcConfigKeys; +import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc; +import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.impl.BlockRequestHandlingInjection; +import org.apache.ratis.server.impl.DelayLocalExecutionInjection; +import org.apache.ratis.server.impl.LogAppenderFactory; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.NetUtils; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { + public static final Factory<MiniRaftClusterWithGRpc> FACTORY + = new Factory<MiniRaftClusterWithGRpc>() { + @Override + public MiniRaftClusterWithGRpc newCluster( + String[] ids, RaftProperties prop, boolean formatted) throws IOException { + return new MiniRaftClusterWithGRpc(ids, prop, formatted); + } + }; + + public static final DelayLocalExecutionInjection sendServerRequestInjection = + new DelayLocalExecutionInjection(RaftGRpcService.GRPC_SEND_SERVER_REQUEST); + + public MiniRaftClusterWithGRpc(int numServers, RaftProperties properties) + throws IOException { + this(generateIds(numServers, 0), properties, true); + } + + public MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties, + boolean formatted) throws IOException { + super(ids, getPropForGrpc(properties), formatted); + init(initRpcServices(getServers(), properties)); + } + + private static RaftProperties getPropForGrpc(RaftProperties prop) { + RaftProperties newProp = new RaftProperties(prop); + newProp.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, + PipelinedLogAppenderFactory.class, LogAppenderFactory.class); + return newProp; + } + + private static Map<RaftPeer, RaftGRpcService> initRpcServices( + Collection<RaftServerImpl> servers, RaftProperties prop) throws IOException { + final Map<RaftPeer, RaftGRpcService> peerRpcs = new HashMap<>(); + + for (RaftServerImpl s : servers) { + final RaftGRpcService rpc = new RaftGRpcService(s, prop); + peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc); + } + return peerRpcs; + } + + @Override + public RaftClientRequestSender getRaftClientRequestSender() { + return new RaftClientSenderWithGrpc(getPeers()); + } + + @Override + protected Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers, + Collection<RaftServerImpl> newServers, boolean startService) + throws IOException { + final Map<RaftPeer, RaftGRpcService> peers = initRpcServices(newServers, properties); + for (Map.Entry<RaftPeer, RaftGRpcService> entry : peers.entrySet()) { + RaftServerImpl server = servers.get(entry.getKey().getId()); + server.setServerRpc(entry.getValue()); + if (!startService) { + BlockRequestHandlingInjection.getInstance().blockReplier(server.getId()); + } else { + server.start(); + } + } + return new ArrayList<>(peers.keySet()); + } + + @Override + protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException { + RaftServerImpl server = servers.get(peer.getId()); + int port = NetUtils.newInetSocketAddress(peer.getAddress()).getPort(); + int oldPort = properties.getInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, + RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT); + properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, port); + final RaftGRpcService rpc = new RaftGRpcService(server, properties); + Preconditions.checkState( + rpc.getInetSocketAddress().toString().contains(peer.getAddress()), + "address in the raft conf: %s, address in rpc server: %s", + peer.getAddress(), rpc.getInetSocketAddress().toString()); + server.setServerRpc(rpc); + properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, oldPort); + return server; + } + + @Override + public void startServer(String id) { + super.startServer(id); + BlockRequestHandlingInjection.getInstance().unblockReplier(id); + } + + @Override + protected void blockQueueAndSetDelay(String leaderId, int delayMs) + throws InterruptedException { + RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection, + leaderId, delayMs, getMaxTimeout()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java new file mode 100644 index 0000000..351e406 --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftNotLeaderExceptionBaseTest; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory; +import org.apache.ratis.server.impl.LogAppenderFactory; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; + +import java.io.IOException; + +public class TestNotLeaderExceptionWithGrpc extends RaftNotLeaderExceptionBaseTest { + @Override + public MiniRaftCluster initCluster() throws IOException { + String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0); + RaftProperties prop = new RaftProperties(); + prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, + PipelinedLogAppenderFactory.class, LogAppenderFactory.class); + return new MiniRaftClusterWithGRpc(s, prop, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java new file mode 100644 index 0000000..450eb6e --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.log4j.Level; +import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory; +import org.apache.ratis.grpc.server.RaftServerProtocolService; +import org.apache.ratis.server.impl.LogAppenderFactory; +import org.apache.ratis.server.impl.RaftReconfigurationBaseTest; +import org.apache.ratis.util.RaftUtils; +import org.junit.BeforeClass; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; + +import java.io.IOException; + +public class TestRaftReconfigurationWithGRpc extends RaftReconfigurationBaseTest { + static { + RaftUtils.setLogLevel(RaftServerProtocolService.LOG, Level.DEBUG); + } + + @BeforeClass + public static void setProp() { + prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, + PipelinedLogAppenderFactory.class, LogAppenderFactory.class); + } + + @Override + public MiniRaftClusterWithGRpc getCluster(int peerNum) throws IOException { + return new MiniRaftClusterWithGRpc(peerNum, prop); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java new file mode 100644 index 0000000..8a26435 --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.statemachine.RaftSnapshotBaseTest; + +import java.io.IOException; + +public class TestRaftSnapshotWithGrpc extends RaftSnapshotBaseTest { + @Override + public MiniRaftCluster initCluster(int numServer, RaftProperties prop) + throws IOException { + return MiniRaftClusterWithGRpc.FACTORY.newCluster(numServer, prop, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java new file mode 100644 index 0000000..f4c8d27 --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.log4j.Level; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.client.AppendStreamer; +import org.apache.ratis.grpc.client.RaftOutputStream; +import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory; +import org.apache.ratis.server.impl.LogAppenderFactory; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.util.RaftUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import static org.apache.ratis.RaftTestUtil.waitForLeader; +import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; +import static org.junit.Assert.fail; + +public class TestRaftStream { + static { + RaftUtils.setLogLevel(AppendStreamer.LOG, Level.ALL); + } + static final Logger LOG = LoggerFactory.getLogger(TestRaftStream.class); + + private static final RaftProperties prop = new RaftProperties(); + private static final int NUM_SERVERS = 3; + + private MiniRaftClusterWithGRpc cluster; + + + @BeforeClass + public static void setProp() { + prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, + PipelinedLogAppenderFactory.class, LogAppenderFactory.class); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private byte[] genContent(int count) { + return toBytes(count); + } + + private byte[] toBytes(int i) { + byte[] b = new byte[4]; + b[0] = (byte) ((i >>> 24) & 0xFF); + b[1] = (byte) ((i >>> 16) & 0xFF); + b[2] = (byte) ((i >>> 8) & 0xFF); + b[3] = (byte) (i & 0xFF); + return b; + } + + @Test + public void testSimpleWrite() throws Exception { + LOG.info("Running testSimpleWrite"); + + // default 64K is too large for a test + prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4); + cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop); + + cluster.start(); + RaftServerImpl leader = waitForLeader(cluster); + + int count = 1; + try (RaftOutputStream out = new RaftOutputStream(prop, "writer-1", + cluster.getPeers(), leader.getId())) { + for (int i = 0; i < 500; i++) { // generate 500 requests + out.write(genContent(count++)); + } + } + + // check the leader's raft log + final RaftLog raftLog = leader.getState().getLog(); + final AtomicInteger currentNum = new AtomicInteger(1); + checkLog(raftLog, 500, () -> { + int value = currentNum.getAndIncrement(); + return toBytes(value); + }); + } + + private void checkLog(RaftLog raftLog, long expectedCommittedIndex, + Supplier<byte[]> s) { + long committedIndex = raftLog.getLastCommittedIndex(); + Assert.assertEquals(expectedCommittedIndex, committedIndex); + // check the log content + LogEntryProto[] entries = raftLog.getEntries(1, expectedCommittedIndex + 1); + for (LogEntryProto entry : entries) { + byte[] logData = entry.getSmLogEntry().getData().toByteArray(); + byte[] expected = s.get(); + Assert.assertEquals("log entry: " + entry, + expected.length, logData.length); + Assert.assertArrayEquals(expected, logData); + } + } + + @Test + public void testWriteAndFlush() throws Exception { + LOG.info("Running testWriteAndFlush"); + + prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, ByteValue.BUFFERSIZE); + cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop); + cluster.start(); + + RaftServerImpl leader = waitForLeader(cluster); + RaftOutputStream out = new RaftOutputStream(prop, "writer", + cluster.getPeers(), leader.getId()); + + int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072}; + ByteValue[] values = new ByteValue[lengths.length]; + for (int i = 0; i < values.length; i++) { + values[i] = new ByteValue(lengths[i], (byte) 9); + } + + List<byte[]> expectedTxs = new ArrayList<>(); + for (ByteValue v : values) { + byte[] data = v.genData(); + expectedTxs.addAll(v.getTransactions()); + out.write(data); + out.flush(); + + // make sure after the flush the data has been committed + Assert.assertEquals(expectedTxs.size(), + leader.getState().getLastAppliedIndex()); + } + out.close(); + + try { + out.write(0); + fail("The OutputStream has been closed"); + } catch (IOException ignored) { + } + + LOG.info("Start to check leader's log"); + final AtomicInteger index = new AtomicInteger(0); + checkLog(leader.getState().getLog(), expectedTxs.size(), + () -> expectedTxs.get(index.getAndIncrement())); + } + + private static class ByteValue { + final static int BUFFERSIZE = 1024; + + final int length; + final byte value; + final int numTx; + byte[] data; + + ByteValue(int length, byte value) { + this.length = length; + this.value = value; + numTx = (length - 1) / BUFFERSIZE + 1; + } + + byte[] genData() { + data = new byte[length]; + Arrays.fill(data, value); + return data; + } + + Collection<byte[]> getTransactions() { + if (data.length <= BUFFERSIZE) { + return Collections.singletonList(data); + } else { + List<byte[]> list = new ArrayList<>(); + for (int i = 0; i < numTx; i++) { + int txSize = Math.min(BUFFERSIZE, length - BUFFERSIZE * i); + byte[] t = new byte[txSize]; + Arrays.fill(t, value); + list.add(t); + } + return list; + } + } + } + + @Test + public void testWriteWithOffset() throws Exception { + LOG.info("Running testWriteWithOffset"); + prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, ByteValue.BUFFERSIZE); + + cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop); + cluster.start(); + RaftServerImpl leader = waitForLeader(cluster); + + RaftOutputStream out = new RaftOutputStream(prop, "writer", + cluster.getPeers(), leader.getId()); + + byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2]; + Arrays.fill(b1, (byte) 1); + byte[] b2 = new byte[ByteValue.BUFFERSIZE]; + Arrays.fill(b2, (byte) 2); + byte[] b3 = new byte[ByteValue.BUFFERSIZE * 2 + ByteValue.BUFFERSIZE / 2]; + Arrays.fill(b3, (byte) 3); + byte[] b4 = new byte[ByteValue.BUFFERSIZE * 4]; + Arrays.fill(b3, (byte) 4); + + byte[] expected = new byte[ByteValue.BUFFERSIZE * 8]; + byte[][] data = new byte[][]{b1, b2, b3, b4}; + final Random random = new Random(); + int totalSize = 0; + for (byte[] b : data) { + System.arraycopy(b, 0, expected, totalSize, b.length); + totalSize += b.length; + + int written = 0; + while (written < b.length) { + int toWrite = random.nextInt(b.length - written) + 1; + LOG.info("write {} bytes", toWrite); + out.write(b, written, toWrite); + written += toWrite; + } + } + out.close(); + + final RaftLog log = leader.getState().getLog(); + // 0.5 + 1 + 2.5 + 4 = 8 + Assert.assertEquals(8, leader.getState().getLastAppliedIndex()); + Assert.assertEquals(8, log.getLastCommittedIndex()); + LogEntryProto[] entries = log.getEntries(1, 9); + byte[] actual = new byte[ByteValue.BUFFERSIZE * 8]; + totalSize = 0; + for (LogEntryProto e : entries) { + byte[] eValue = e.getSmLogEntry().getData().toByteArray(); + Assert.assertEquals(ByteValue.BUFFERSIZE, eValue.length); + System.arraycopy(eValue, 0, actual, totalSize, eValue.length); + totalSize += eValue.length; + } + Assert.assertArrayEquals(expected, actual); + } + + /** + * Write while leader is killed + */ + @Test + public void testKillLeader() throws Exception { + LOG.info("Running testChangeLeader"); + + prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4); + cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop); + cluster.start(); + final RaftServerImpl leader = waitForLeader(cluster); + + final AtomicBoolean running = new AtomicBoolean(true); + final AtomicBoolean success = new AtomicBoolean(false); + final AtomicInteger result = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(1); + + new Thread(() -> { + LOG.info("Writer thread starts"); + int count = 0; + try (RaftOutputStream out = new RaftOutputStream(prop, "writer", + cluster.getPeers(), leader.getId())) { + while (running.get()) { + out.write(toBytes(count++)); + Thread.sleep(10); + } + success.set(true); + result.set(count); + } catch (Exception e) { + LOG.info("Got exception when writing", e); + success.set(false); + } finally { + latch.countDown(); + } + }).start(); + + // force change the leader + RaftTestUtil.waitAndKillLeader(cluster, true); + final RaftServerImpl newLeader = waitForLeader(cluster); + Assert.assertNotEquals(leader.getId(), newLeader.getId()); + Thread.sleep(500); + + running.set(false); + latch.await(5, TimeUnit.SECONDS); + Assert.assertTrue(success.get()); + // total number of tx should be >= result + 2, where 2 means two NoOp from + // leaders. It may be larger than result+2 because the client may resend + // requests and we do not have retry cache on servers yet. + LOG.info("last applied index: {}. total number of requests: {}", + newLeader.getState().getLastAppliedIndex(), result.get()); + Assert.assertTrue( + newLeader.getState().getLastAppliedIndex() >= result.get() + 1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java new file mode 100644 index 0000000..b60e30d --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.log4j.Level; +import org.apache.ratis.RaftBasicTests; +import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory; +import org.apache.ratis.server.impl.BlockRequestHandlingInjection; +import org.apache.ratis.server.impl.LogAppenderFactory; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.RaftUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; + +import java.io.IOException; + +public class TestRaftWithGrpc extends RaftBasicTests { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + } + + private final MiniRaftClusterWithGRpc cluster; + + @BeforeClass + public static void setProp() { + properties.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, + PipelinedLogAppenderFactory.class, LogAppenderFactory.class); + } + + public TestRaftWithGrpc() throws IOException { + cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, properties); + Assert.assertNull(cluster.getLeader()); + } + + @Override + public MiniRaftClusterWithGRpc getCluster() { + return cluster; + } + + @Override + @Test + public void testEnforceLeader() throws Exception { + super.testEnforceLeader(); + + MiniRaftClusterWithGRpc.sendServerRequestInjection.clear(); + BlockRequestHandlingInjection.getInstance().unblockAll(); + } + + @Override + @Test + public void testWithLoad() throws Exception { + super.testWithLoad(); + BlockRequestHandlingInjection.getInstance().unblockAll(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/resources/log4j.properties b/ratis-grpc/src/test/resources/log4j.properties new file mode 100644 index 0000000..ced0687 --- /dev/null +++ b/ratis-grpc/src/test/resources/log4j.properties @@ -0,0 +1,18 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/ratis-hadoop/pom.xml b/ratis-hadoop/pom.xml new file mode 100644 index 0000000..ab7fd9f --- /dev/null +++ b/ratis-hadoop/pom.xml @@ -0,0 +1,99 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>ratis-project-dist</artifactId> + <groupId>org.apache.ratis</groupId> + <version>1.0-SNAPSHOT</version> + <relativePath>../ratis-project-dist</relativePath> + </parent> + + <artifactId>ratis-hadoop</artifactId> + <name>Ratis Hadoop Support</name> + + <dependencies> + <dependency> + <artifactId>ratis-proto-shaded</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + + <dependency> + <artifactId>ratis-common</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + <dependency> + <artifactId>ratis-common</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <artifactId>ratis-client</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + <dependency> + <artifactId>ratis-client</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <artifactId>ratis-server</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + <dependency> + <artifactId>ratis-server</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java b/ratis-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java new file mode 100644 index 0000000..a5a9654 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java @@ -0,0 +1,623 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputOutputStream; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.RPC.RpcInvoker; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.ProtoUtil; +import org.apache.hadoop.util.Time; +import org.apache.ratis.shaded.com.google.protobuf.*; +import org.apache.ratis.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.ratis.shaded.org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto; +import org.apache.ratis.shaded.org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; +import org.apache.ratis.shaded.org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; + +import javax.net.SocketFactory; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Copied from {@link org.apache.hadoop.ipc.ProtobufRpcEngine} + * and replaced the protobuf classes with the shaded classes. + */ [email protected] +public class ProtobufRpcEngineShaded implements RpcEngine { + public static final Log LOG = LogFactory.getLog(ProtobufRpcEngineShaded.class); + + static { // Register the rpcRequest deserializer for WritableRpcEngine + org.apache.hadoop.ipc.Server.registerProtocolEngine( + RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class, + new Server.ProtoBufRpcInvoker()); + } + + private static final ClientCache CLIENTS = new ClientCache(); + + public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int rpcTimeout) throws IOException { + return getProxy(protocol, clientVersion, addr, ticket, conf, factory, + rpcTimeout, null); + } + + @Override + public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy + ) throws IOException { + return getProxy(protocol, clientVersion, addr, ticket, conf, factory, + rpcTimeout, connectionRetryPolicy, null); + } + + @Override + @SuppressWarnings("unchecked") + public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + + final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, + rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth); + return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance( + protocol.getClassLoader(), new Class[]{protocol}, invoker), false); + } + + @Override + public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class; + return new ProtocolProxy<ProtocolMetaInfoPB>(protocol, + (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(), + new Class[] { protocol }, new Invoker(protocol, connId, conf, + factory)), false); + } + + private static class Invoker implements RpcInvocationHandler { + private final Map<String, Message> returnTypes = + new ConcurrentHashMap<String, Message>(); + private boolean isClosed = false; + private final Client.ConnectionId remoteId; + private final Client client; + private final long clientProtocolVersion; + private final String protocolName; + private AtomicBoolean fallbackToSimpleAuth; + + private Invoker(Class<?> protocol, InetSocketAddress addr, + UserGroupInformation ticket, Configuration conf, SocketFactory factory, + int rpcTimeout, RetryPolicy connectionRetryPolicy, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + this(protocol, Client.ConnectionId.getConnectionId( + addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf), + conf, factory); + this.fallbackToSimpleAuth = fallbackToSimpleAuth; + } + + /** + * This constructor takes a connectionId, instead of creating a new one. + */ + private Invoker(Class<?> protocol, Client.ConnectionId connId, + Configuration conf, SocketFactory factory) { + this.remoteId = connId; + this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class); + this.protocolName = RPC.getProtocolName(protocol); + this.clientProtocolVersion = RPC + .getProtocolVersion(protocol); + } + + private RequestHeaderProto constructRpcRequestHeader(Method method) { + RequestHeaderProto.Builder builder = RequestHeaderProto + .newBuilder(); + builder.setMethodName(method.getName()); + + + // For protobuf, {@code protocol} used when creating client side proxy is + // the interface extending BlockingInterface, which has the annotations + // such as ProtocolName etc. + // + // Using Method.getDeclaringClass(), as in WritableEngine to get at + // the protocol interface will return BlockingInterface, from where + // the annotation ProtocolName and Version cannot be + // obtained. + // + // Hence we simply use the protocol class used to create the proxy. + // For PB this may limit the use of mixins on client side. + builder.setDeclaringClassProtocolName(protocolName); + builder.setClientProtocolVersion(clientProtocolVersion); + return builder.build(); + } + + /** + * This is the client side invoker of RPC method. It only throws + * ServiceException, since the invocation proxy expects only + * ServiceException to be thrown by the method in case protobuf service. + * + * ServiceException has the following causes: + * <ol> + * <li>Exceptions encountered on the client side in this method are + * set as cause in ServiceException as is.</li> + * <li>Exceptions from the server are wrapped in RemoteException and are + * set as cause in ServiceException</li> + * </ol> + * + * Note that the client calling protobuf RPC methods, must handle + * ServiceException by getting the cause from the ServiceException. If the + * cause is RemoteException, then unwrap it to get the exception thrown by + * the server. + */ + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws ServiceException { + long startTime = 0; + if (LOG.isDebugEnabled()) { + startTime = Time.now(); + } + + if (args.length != 2) { // RpcController + Message + throw new ServiceException("Too many parameters for request. Method: [" + + method.getName() + "]" + ", Expected: 2, Actual: " + + args.length); + } + if (args[1] == null) { + throw new ServiceException("null param while calling Method: [" + + method.getName() + "]"); + } + + RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method); + + if (LOG.isTraceEnabled()) { + LOG.trace(Thread.currentThread().getId() + ": Call -> " + + remoteId + ": " + method.getName() + + " {" + TextFormat.shortDebugString((Message) args[1]) + "}"); + } + + + Message theRequest = (Message) args[1]; + final RpcResponseWrapper val; + try { + val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, + new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId, + fallbackToSimpleAuth); + + } catch (Throwable e) { + if (LOG.isTraceEnabled()) { + LOG.trace(Thread.currentThread().getId() + ": Exception <- " + + remoteId + ": " + method.getName() + + " {" + e + "}"); + } + throw new ServiceException(e); + } + + if (LOG.isDebugEnabled()) { + long callTime = Time.now() - startTime; + LOG.debug("Call: " + method.getName() + " took " + callTime + "ms"); + } + + Message prototype = null; + try { + prototype = getReturnProtoType(method); + } catch (Exception e) { + throw new ServiceException(e); + } + Message returnMessage; + try { + returnMessage = prototype.newBuilderForType() + .mergeFrom(val.theResponseRead).build(); + + if (LOG.isTraceEnabled()) { + LOG.trace(Thread.currentThread().getId() + ": Response <- " + + remoteId + ": " + method.getName() + + " {" + TextFormat.shortDebugString(returnMessage) + "}"); + } + + } catch (Throwable e) { + throw new ServiceException(e); + } + return returnMessage; + } + + @Override + public void close() throws IOException { + if (!isClosed) { + isClosed = true; + CLIENTS.stopClient(client); + } + } + + private Message getReturnProtoType(Method method) throws Exception { + if (returnTypes.containsKey(method.getName())) { + return returnTypes.get(method.getName()); + } + + Class<?> returnType = method.getReturnType(); + Method newInstMethod = returnType.getMethod("getDefaultInstance"); + newInstMethod.setAccessible(true); + Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null); + returnTypes.put(method.getName(), prototype); + return prototype; + } + + @Override //RpcInvocationHandler + public ConnectionId getConnectionId() { + return remoteId; + } + } + + interface RpcWrapper extends Writable { + int getLength(); + } + /** + * Wrapper for Protocol Buffer Requests + * + * Note while this wrapper is writable, the request on the wire is in + * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} + * use type Writable as a wrapper to work across multiple RpcEngine kinds. + */ + private static abstract class RpcMessageWithHeader<T extends GeneratedMessage> + implements RpcWrapper { + T requestHeader; + Message theRequest; // for clientSide, the request is here + byte[] theRequestRead; // for server side, the request is here + + public RpcMessageWithHeader() { + } + + public RpcMessageWithHeader(T requestHeader, Message theRequest) { + this.requestHeader = requestHeader; + this.theRequest = theRequest; + } + + @Override + public void write(DataOutput out) throws IOException { + OutputStream os = DataOutputOutputStream.constructOutputStream(out); + + ((Message)requestHeader).writeDelimitedTo(os); + theRequest.writeDelimitedTo(os); + } + + @Override + public void readFields(DataInput in) throws IOException { + requestHeader = parseHeaderFrom(readVarintBytes(in)); + theRequestRead = readMessageRequest(in); + } + + abstract T parseHeaderFrom(byte[] bytes) throws IOException; + + byte[] readMessageRequest(DataInput in) throws IOException { + return readVarintBytes(in); + } + + private static byte[] readVarintBytes(DataInput in) throws IOException { + final int length = ProtoUtil.readRawVarint32(in); + final byte[] bytes = new byte[length]; + in.readFully(bytes); + return bytes; + } + + public T getMessageHeader() { + return requestHeader; + } + + public byte[] getMessageBytes() { + return theRequestRead; + } + + @Override + public int getLength() { + int headerLen = requestHeader.getSerializedSize(); + int reqLen; + if (theRequest != null) { + reqLen = theRequest.getSerializedSize(); + } else if (theRequestRead != null ) { + reqLen = theRequestRead.length; + } else { + throw new IllegalArgumentException( + "getLength on uninitialized RpcWrapper"); + } + return CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen + + CodedOutputStream.computeRawVarint32Size(reqLen) + reqLen; + } + } + + private static class RpcRequestWrapper + extends RpcMessageWithHeader<RequestHeaderProto> { + @SuppressWarnings("unused") + public RpcRequestWrapper() {} + + public RpcRequestWrapper( + RequestHeaderProto requestHeader, Message theRequest) { + super(requestHeader, theRequest); + } + + @Override + RequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException { + return RequestHeaderProto.parseFrom(bytes); + } + + @Override + public String toString() { + return requestHeader.getDeclaringClassProtocolName() + "." + + requestHeader.getMethodName(); + } + } + + @InterfaceAudience.LimitedPrivate({"RPC"}) + public static class RpcRequestMessageWrapper + extends RpcMessageWithHeader<RpcRequestHeaderProto> { + public RpcRequestMessageWrapper() {} + + public RpcRequestMessageWrapper( + RpcRequestHeaderProto requestHeader, Message theRequest) { + super(requestHeader, theRequest); + } + + @Override + RpcRequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException { + return RpcRequestHeaderProto.parseFrom(bytes); + } + } + + @InterfaceAudience.LimitedPrivate({"RPC"}) + public static class RpcResponseMessageWrapper + extends RpcMessageWithHeader<RpcResponseHeaderProto> { + public RpcResponseMessageWrapper() {} + + public RpcResponseMessageWrapper( + RpcResponseHeaderProto responseHeader, Message theRequest) { + super(responseHeader, theRequest); + } + + @Override + byte[] readMessageRequest(DataInput in) throws IOException { + // error message contain no message body + switch (requestHeader.getStatus()) { + case ERROR: + case FATAL: + return null; + default: + return super.readMessageRequest(in); + } + } + + @Override + RpcResponseHeaderProto parseHeaderFrom(byte[] bytes) throws IOException { + return RpcResponseHeaderProto.parseFrom(bytes); + } + } + + /** + * Wrapper for Protocol Buffer Responses + * + * Note while this wrapper is writable, the request on the wire is in + * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} + * use type Writable as a wrapper to work across multiple RpcEngine kinds. + */ + @InterfaceAudience.LimitedPrivate({"RPC"}) // temporarily exposed + public static class RpcResponseWrapper implements RpcWrapper { + Message theResponse; // for senderSide, the response is here + byte[] theResponseRead; // for receiver side, the response is here + + public RpcResponseWrapper() { + } + + public RpcResponseWrapper(Message message) { + this.theResponse = message; + } + + @Override + public void write(DataOutput out) throws IOException { + OutputStream os = DataOutputOutputStream.constructOutputStream(out); + theResponse.writeDelimitedTo(os); + } + + @Override + public void readFields(DataInput in) throws IOException { + int length = ProtoUtil.readRawVarint32(in); + theResponseRead = new byte[length]; + in.readFully(theResponseRead); + } + + @Override + public int getLength() { + int resLen; + if (theResponse != null) { + resLen = theResponse.getSerializedSize(); + } else if (theResponseRead != null ) { + resLen = theResponseRead.length; + } else { + throw new IllegalArgumentException( + "getLength on uninitialized RpcWrapper"); + } + return CodedOutputStream.computeRawVarint32Size(resLen) + resLen; + } + } + + @VisibleForTesting + @InterfaceAudience.Private + @InterfaceStability.Unstable + static Client getClient(Configuration conf) { + return CLIENTS.getClient(conf, SocketFactory.getDefault(), + RpcResponseWrapper.class); + } + + + + @Override + public RPC.Server getServer(Class<?> protocol, Object protocolImpl, + String bindAddress, int port, int numHandlers, int numReaders, + int queueSizePerHandler, boolean verbose, Configuration conf, + SecretManager<? extends TokenIdentifier> secretManager, + String portRangeConfig) + throws IOException { + return new Server(protocol, protocolImpl, conf, bindAddress, port, + numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, + portRangeConfig); + } + + public static class Server extends RPC.Server { + /** + * Construct an RPC server. + * + * @param protocolClass the class of protocol + * @param protocolImpl the protocolImpl whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param verbose whether each call should be logged + * @param portRangeConfig A config parameter that can be used to restrict + * the range of ports used when port is 0 (an ephemeral port) + */ + public Server(Class<?> protocolClass, Object protocolImpl, + Configuration conf, String bindAddress, int port, int numHandlers, + int numReaders, int queueSizePerHandler, boolean verbose, + SecretManager<? extends TokenIdentifier> secretManager, + String portRangeConfig) + throws IOException { + super(bindAddress, port, null, numHandlers, + numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl + .getClass().getName()), secretManager, portRangeConfig); + this.verbose = verbose; + registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, + protocolImpl); + } + + /** + * Protobuf invoker for {@link RpcInvoker} + */ + static class ProtoBufRpcInvoker implements RpcInvoker { + private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server, + String protoName, long clientVersion) throws RpcServerException { + ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion); + ProtoClassProtoImpl impl = + server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv); + if (impl == null) { // no match for Protocol AND Version + VerProtocolImpl highest = + server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, + protoName); + if (highest == null) { + throw new RpcNoSuchProtocolException( + "Unknown protocol: " + protoName); + } + // protocol supported but not the version that client wants + throw new RPC.VersionMismatch(protoName, clientVersion, + highest.version); + } + return impl; + } + + @Override + /** + * This is a server side method, which is invoked over RPC. On success + * the return response has protobuf response payload. On failure, the + * exception name and the stack trace are return in the resposne. + * See {@link HadoopRpcResponseProto} + * + * In this method there three types of exceptions possible and they are + * returned in response as follows. + * <ol> + * <li> Exceptions encountered in this method that are returned + * as {@link RpcServerException} </li> + * <li> Exceptions thrown by the service is wrapped in ServiceException. + * In that this method returns in response the exception thrown by the + * service.</li> + * <li> Other exceptions thrown by the service. They are returned as + * it is.</li> + * </ol> + */ + public Writable call(RPC.Server server, String protocol, + Writable writableRequest, long receiveTime) throws Exception { + RpcRequestWrapper request = (RpcRequestWrapper) writableRequest; + RequestHeaderProto rpcRequest = request.requestHeader; + String methodName = rpcRequest.getMethodName(); + String protoName = rpcRequest.getDeclaringClassProtocolName(); + long clientVersion = rpcRequest.getClientProtocolVersion(); + if (server.verbose) + LOG.info("Call: protocol=" + protocol + ", method=" + methodName); + + ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName, + clientVersion); + BlockingService service = (BlockingService) protocolImpl.protocolImpl; + MethodDescriptor methodDescriptor = service.getDescriptorForType() + .findMethodByName(methodName); + if (methodDescriptor == null) { + String msg = "Unknown method " + methodName + " called on " + protocol + + " protocol."; + LOG.warn(msg); + throw new RpcNoSuchMethodException(msg); + } + Message prototype = service.getRequestPrototype(methodDescriptor); + Message param = prototype.newBuilderForType() + .mergeFrom(request.theRequestRead).build(); + + Message result; + long startTime = Time.now(); + int qTime = (int) (startTime - receiveTime); + Exception exception = null; + try { + server.rpcDetailedMetrics.init(protocolImpl.protocolClass); + result = service.callBlockingMethod(methodDescriptor, null, param); + } catch (ServiceException e) { + exception = (Exception) e.getCause(); + throw (Exception) e.getCause(); + } catch (Exception e) { + exception = e; + throw e; + } finally { + int processingTime = (int) (Time.now() - startTime); + if (LOG.isDebugEnabled()) { + String msg = "Served: " + methodName + " queueTime= " + qTime + + " procesingTime= " + processingTime; + if (exception != null) { + msg += " exception= " + exception.getClass().getSimpleName(); + } + LOG.debug(msg); + } + String detailedMetricsName = (exception == null) ? + methodName : + exception.getClass().getSimpleName(); + server.rpcMetrics.addRpcQueueTime(qTime); + server.rpcMetrics.addRpcProcessingTime(processingTime); + server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName, + processingTime); + } + return new RpcResponseWrapper(result); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java new file mode 100644 index 0000000..a50b938 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +public interface HadoopConstants { + String RAFT_SERVER_KERBEROS_PRINCIPAL_KEY + = "raft.server.kerberos.principal"; + String RAFT_CLIENT_KERBEROS_PRINCIPAL_KEY + = "raft.client.kerberos.principal"; + String RAFT_SERVER_PROTOCOL_NAME + = "org.apache.hadoop.raft.server.protocol.RaftServerProtocol"; + String RAFT_CLIENT_PROTOCOL_NAME + = "org.apache.hadoop.raft.protocol.RaftClientProtocol"; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/Proxy.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/Proxy.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/Proxy.java new file mode 100644 index 0000000..22edef1 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/Proxy.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngineShaded; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.Closeable; +import java.io.IOException; + +public class Proxy<PROTOCOL> implements Closeable { + public static <PROTOCOL> PROTOCOL getProxy( + Class<PROTOCOL> clazz, String addressStr, Configuration conf) + throws IOException { + RPC.setProtocolEngine(conf, clazz, ProtobufRpcEngineShaded.class); + return RPC.getProxy(clazz, RPC.getProtocolVersion(clazz), + org.apache.ratis.util.NetUtils.newInetSocketAddress(addressStr), + UserGroupInformation.getCurrentUser(), + conf, NetUtils.getSocketFactory(conf, clazz)); + } + + private final PROTOCOL protocol; + + public Proxy(Class<PROTOCOL> clazz, String addressStr, Configuration conf) + throws IOException { + this.protocol = getProxy(clazz, addressStr, conf); + } + + public PROTOCOL getProtocol() { + return protocol; + } + + @Override + public void close() { + RPC.stopProxy(protocol); + } +}
