This is an automated email from the ASF dual-hosted git repository.

rajeshbabu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 741d8bd  RATIS-556 Detect node failures and close the log to prevent 
additional writes(Rajeshbabu)
741d8bd is described below

commit 741d8bd4fbb1c9642dd054e72cae0c7afb180170
Author: Rajeshbabu Chintaguntla <Rajeshbabu Chintaguntla>
AuthorDate: Thu Sep 5 10:28:55 2019 +0530

    RATIS-556 Detect node failures and close the log to prevent additional 
writes(Rajeshbabu)
---
 .../apache/ratis/logservice/common/Constants.java  |   8 ++
 .../apache/ratis/logservice/server/LogServer.java  |  40 ++++++
 .../ratis/logservice/server/LogStateMachine.java   |  16 ++-
 .../ratis/logservice/server/MetaStateMachine.java  | 138 ++++++++++++++++++---
 .../ratis/logservice/server/MetadataServer.java    |  13 +-
 .../ratis/logservice/util/LogServiceProtoUtil.java |   2 +-
 .../logservice/util/MetaServiceProtoUtil.java      |  11 +-
 ratis-logservice/src/main/proto/LogService.proto   |   1 +
 ratis-logservice/src/main/proto/MetaService.proto  |   9 +-
 .../ratis/logservice/server/TestMetaServer.java    |  40 +++++-
 10 files changed, 244 insertions(+), 34 deletions(-)

diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
index be46177..7151341 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
@@ -58,4 +58,12 @@ public class Constants {
     public static final String RATIS_RAFT_SEGMENT_SIZE_KEY = 
"ratis.raft.segment.size";
     public static final long DEFAULT_RATIS_RAFT_SEGMENT_SIZE = 32 * 1024 
*1024;// 32MB
 
+    public static final String LOG_SERVICE_HEARTBEAT_INTERVAL_KEY =
+            "logservice.heartbeat.interval"; // in ms
+    public static final long DEFAULT_HEARTBEAT_INTERVAL = 3000;// 3 seconds
+
+    public static final String LOG_SERVICE_PEER_FAILURE_DETECTION_PERIOD_KEY =
+            "logservice.peer.failure.detection.period"; // in ms
+    public static final long DEFAULT_PEER_FAILURE_DETECTION_PERIOD = 60000;// 
1 min.
+
 }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
index 50da769..6a3e926 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
@@ -40,6 +40,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
@@ -53,6 +54,8 @@ public class LogServer extends BaseServer {
     private RaftServer raftServer = null;
     private RaftClient metaClient = null;
 
+    private Daemon daemon =  null;
+    private long heartbeatInterval = Constants.DEFAULT_HEARTBEAT_INTERVAL;
     public LogServer(ServerOpts opts) {
       super(opts);
       LOG.debug("Log Server options: {}", opts);
@@ -84,6 +87,13 @@ public class LogServer extends BaseServer {
       if (archiveLocation != null) {
         properties.set(Constants.LOG_SERVICE_ARCHIVAL_LOCATION_KEY, 
archiveLocation);
       }
+      heartbeatInterval = 
getConfig().getLong(Constants.LOG_SERVICE_HEARTBEAT_INTERVAL_KEY,
+        Constants.DEFAULT_HEARTBEAT_INTERVAL);
+      if(heartbeatInterval <= 0) {
+          LOG.warn("Heartbeat interval configuration is invalid." +
+                  " Setting default value "+ 
Constants.DEFAULT_HEARTBEAT_INTERVAL);
+          heartbeatInterval = Constants.DEFAULT_HEARTBEAT_INTERVAL;
+      }
       RaftServerConfigKeys.Log.setSegmentSizeMax(properties, segmentSizeBytes);
       RaftServerConfigKeys.Log.setPreallocatedSize(properties, 
segmentSizeBytes);
 
@@ -136,6 +146,9 @@ public class LogServer extends BaseServer {
                 .setProperties(properties)
                 .build();
         metaClient.send(() -> 
MetaServiceProtoUtil.toPingRequestProto(peer).toByteString());
+        daemon = new Daemon(new HeartbeatSender(new 
RaftPeer(raftServer.getId())),
+                "heartbeat-Sender"+raftServer.getId());
+        daemon.start();
     }
 
     public static void main(String[] args) throws IOException {
@@ -164,6 +177,7 @@ public class LogServer extends BaseServer {
 
     public void close() throws IOException {
         raftServer.close();
+        daemon.interrupt();
     }
 
     public static class Builder extends BaseServer.Builder<LogServer> {
@@ -172,4 +186,30 @@ public class LogServer extends BaseServer {
             return new LogServer(getOpts());
         }
     }
+
+    private class HeartbeatSender implements Runnable {
+
+        RaftPeer peer;
+        public HeartbeatSender(RaftPeer peer) {
+            this.peer = peer;
+        }
+
+        @Override
+        public void run() {
+
+            while (true) {
+                try {
+                    metaClient.send(() -> MetaServiceProtoUtil.
+                            toHeartbeatRequestProto(peer).toByteString());
+                    Thread.sleep(heartbeatInterval);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    return;
+                } catch (IOException e) {
+                    LOG.warn("Heartbeat request failed with exception", e);
+                }
+            }
+
+        }
+    }
 }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index 6b631a0..1d8cbcb 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -532,28 +532,34 @@ public class LogStateMachine extends BaseStateMachine {
     // TODO need to handle exceptions while operating with files.
 
     State targetState = State.valueOf(changeState.getState().name());
+    Throwable t = null;
     //if forced skip checking states
     if(!changeState.getForce()) {
       switch (targetState) {
       case OPEN:
         if (state != null) {
-          verifyState(State.OPEN, State.CLOSED);
+          t = verifyState(State.OPEN, State.CLOSED);
         }
         break;
       case CLOSED:
-        verifyState(State.OPEN);
+        t =  verifyState(State.OPEN);
         break;
       case ARCHIVED:
-        verifyState(State.ARCHIVING);
+        t = verifyState(State.ARCHIVING);
         break;
       case ARCHIVING:
-        verifyState(State.CLOSED);
+        t = verifyState(State.CLOSED);
         break;
       case DELETED:
-        verifyState(State.CLOSED);
+        t = verifyState(State.CLOSED);
         break;
       }
     }
+    if(t != null) {
+      return CompletableFuture.completedFuture(Message
+              .valueOf(LogServiceProtos.ChangeStateReplyProto.newBuilder().
+                      
setException(LogServiceProtoUtil.toLogException(t)).build().toByteString()));
+    }
     this.state = targetState;
     return CompletableFuture.completedFuture(Message
         
.valueOf(LogServiceProtos.ChangeStateReplyProto.newBuilder().build().toByteString()));
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index 5e0785b..e50b39c 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -19,11 +19,7 @@
 package org.apache.ratis.logservice.server;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -37,10 +33,13 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.logservice.api.LogInfo;
 import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.common.Constants;
 import org.apache.ratis.logservice.common.LogAlreadyExistException;
 import org.apache.ratis.logservice.common.LogNotFoundException;
 import org.apache.ratis.logservice.common.NoEnoughWorkersException;
 import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
 import org.apache.ratis.logservice.proto.MetaServiceProtos;
 import 
org.apache.ratis.logservice.proto.MetaServiceProtos.CreateLogRequestProto;
 import 
org.apache.ratis.logservice.proto.MetaServiceProtos.DeleteLogRequestProto;
@@ -53,18 +52,16 @@ import 
org.apache.ratis.logservice.util.MetaServiceProtoUtil;
 import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.metrics.impl.RatisMetricRegistryImpl;
 import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.*;
+
+
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,10 +86,15 @@ public class MetaStateMachine extends BaseStateMachine {
     // keep a copy of raftServer to get group information.
     private RaftServer raftServer;
 
+    private Map<RaftPeer, Set<LogName>> peerLogs = new ConcurrentHashMap<>();
+
+    private Map<RaftPeer, Long> heartbeatInfo = new ConcurrentHashMap<>();
 
     private RaftGroup currentGroup = null;
 
+    private Daemon peerHealthChecker = null;
     // MinHeap queue for load balancing groups across the peers
+    private long failureDetectionPeriod = 
Constants.DEFAULT_PEER_FAILURE_DETECTION_PERIOD;
     private PriorityBlockingQueue<PeerGroups> avail = new 
PriorityBlockingQueue<PeerGroups>();
 
     //Properties
@@ -104,9 +106,11 @@ public class MetaStateMachine extends BaseStateMachine {
     private RaftGroupId logServerGroupId;
     private RatisMetricRegistry metricRegistry;
 
-    public MetaStateMachine(RaftGroupId metadataGroupId, RaftGroupId 
logServerGroupId) {
+    public MetaStateMachine(RaftGroupId metadataGroupId, RaftGroupId 
logServerGroupId,
+                            long failureDetectionPeriod) {
       this.metadataGroupId = metadataGroupId;
       this.logServerGroupId = logServerGroupId;
+      this.failureDetectionPeriod = failureDetectionPeriod;
     }
 
     @Override
@@ -115,6 +119,8 @@ public class MetaStateMachine extends BaseStateMachine {
         this.metricRegistry = LogServiceMetricsRegistry
             
.createMetricRegistryForLogServiceMetaData(server.getId().toString());
         super.initialize(server, groupId, storage);
+        peerHealthChecker = new Daemon(new 
PeerHealthChecker(),"peer-Health-Checker");
+        peerHealthChecker.start();
     }
 
     @Override
@@ -131,7 +137,19 @@ public class MetaStateMachine extends BaseStateMachine {
                 LogServiceRegisterLogRequestProto r = req.getRegisterRequest();
                 LogName logname = 
LogServiceProtoUtil.toLogName(r.getLogname());
                 RaftGroup rg = 
MetaServiceProtoUtil.toRaftGroup(r.getRaftGroup());
+                rg.getPeers().stream().forEach(raftPeer -> {
+                    Set<LogName> logNames;
+                    if(!peerLogs.containsKey(raftPeer)) {
+                        logNames = new HashSet<>();
+                        peerLogs.put(raftPeer, logNames);
+                    } else {
+                        logNames = peerLogs.get(raftPeer);
+                    }
+                    logNames.add(logname);
+
+                });
                 map.put(logname, rg);
+
                 LOG.info("Log {} registered at {} with group {} ", logname, 
getId(), rg );
                 break;
             case UNREGISTERREQUEST:
@@ -147,9 +165,14 @@ public class MetaStateMachine extends BaseStateMachine {
                 } else {
                     peers.add(peer);
                     avail.add(new PeerGroups(peer));
+                    heartbeatInfo.put(peer,  System.currentTimeMillis());
                 }
                 break;
-
+            case HEARTBEATREQUEST:
+                MetaServiceProtos.LogServiceHeartbeatRequestProto 
heartbeatRequest = req.getHeartbeatRequest();
+                RaftPeer heartbeatPeer = 
MetaServiceProtoUtil.toRaftPeer(heartbeatRequest.getPeer());
+                heartbeatInfo.put(heartbeatPeer,  System.currentTimeMillis());
+                break;
             default:
         }
         return super.applyTransactionSerial(trx);
@@ -328,15 +351,14 @@ public class MetaStateMachine extends BaseStateMachine {
                         .build();
                 try {
                     client.send(() -> 
MetaServiceProtos.MetaSMRequestProto.newBuilder()
-                            
.setRegisterRequest(LogServiceRegisterLogRequestProto
-                                    .newBuilder()
+                            
.setRegisterRequest(LogServiceRegisterLogRequestProto.newBuilder()
                                     
.setLogname(LogServiceProtoUtil.toLogNameProto(name))
                                     .setRaftGroup(MetaServiceProtoUtil
                                             .toRaftGroupProto(raftGroup)))
                             .build().toByteString());
                 } catch (IOException e) {
                     LOG.error(
-                        "Exception while registring raft group with Metadata 
Service during creation of log");
+                        "Exception while registering raft group with Metadata 
Service during creation of log");
                     e.printStackTrace();
                 }
                 return 
CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
@@ -382,7 +404,6 @@ public class MetaStateMachine extends BaseStateMachine {
 
         public PeerGroups(RaftPeer peer) {
             this.peer = peer;
-
         }
 
         public Set<RaftGroup> getGroups () {
@@ -398,4 +419,87 @@ public class MetaStateMachine extends BaseStateMachine {
             return groups.size() - ((PeerGroups) o).groups.size();
         }
     }
+
+    private class PeerHealthChecker implements Runnable {
+        @Override
+        public void run() {
+            while(true) {
+                try {
+                    Thread.sleep(1000);
+                    long now = System.currentTimeMillis();
+                    heartbeatInfo.keySet().stream().forEach(raftPeer -> {
+                        Long heartbeatTimestamp = heartbeatInfo.get(raftPeer);
+                        // Introduce configuration for period to detect the 
failure.
+                        if((now - heartbeatTimestamp) > 
failureDetectionPeriod) {
+                            // Close the logs serve by peer if any.
+                            if (peerLogs.containsKey(raftPeer)) {
+                                LOG.warn("Closing all logs hosted by peer {} 
because last heartbeat" +
+                                                " ({}ms) exceeds the threshold 
({}ms)", raftPeer, now - heartbeatTimestamp,
+                                        failureDetectionPeriod);
+                                peers.remove(raftPeer);
+                                Set<LogName> logNames = peerLogs.get(raftPeer);
+                                Iterator<LogName> itr = logNames.iterator();
+                                while(itr.hasNext()) {
+                                    LogName logName = itr.next();
+                                    RaftGroup group = map.get(logName);
+                                    RaftClient client = 
RaftClient.newBuilder().
+                                            
setRaftGroup(group).setProperties(properties).build();
+                                    try {
+                                        LOG.warn(String.format("Peer %s in the 
group %s went down." +
+                                                        " Hence closing the 
log %s serve by the group.",
+                                                raftPeer.toString(), 
group.toString(), logName.toString()));
+                                        RaftClientReply reply = client.send(
+                                                () -> LogServiceProtoUtil.
+                                                        
toChangeStateRequestProto(logName, LogStream.State.CLOSED, true)
+                                                        .toByteString());
+                                        LogServiceProtos.ChangeStateReplyProto 
message =
+                                                
LogServiceProtos.ChangeStateReplyProto.parseFrom(reply.getMessage().getContent());
+                                        if(message.hasException()) {
+                                            throw new 
IOException(message.getException().getErrorMsg());
+                                        }
+                                        itr.remove();
+                                        client.close();
+                                    } catch (IOException e) {
+                                        LOG.warn(String.format("Failed to 
close log %s on peer %s failure.",
+                                                logName, raftPeer.toString()), 
e);
+                                    }
+                                }
+                                if(logNames.isEmpty()) {
+                                    peerLogs.remove(raftPeer);
+                                    heartbeatInfo.remove(raftPeer);
+                                } // else retry closing failed logs on next 
period.
+                            }
+                            final List<PeerGroups> peerGroupsToRemove = new 
ArrayList<>();
+                            // remove peer groups from avail.
+                            avail.stream().forEach(peerGroup -> {
+                                if(peerGroup.getPeer().equals(raftPeer)) {
+                                    peerGroupsToRemove.add(peerGroup);
+                                }
+                            });
+                            for(PeerGroups peerGroups: peerGroupsToRemove) {
+                                avail.remove(peerGroups);
+                            }
+                        }
+                    });
+                } catch (Exception e) {
+                    LOG.error(
+                            "Exception while closing logs and removing peer" +
+                                    " from raft groups with Metadata Service 
on node failure", e);
+                }
+            }
+        }
+    }
+
+
+    // This method need to be used for testing only.
+    public boolean checkPeersAreSame() {
+        if(!peers.equals(peerLogs.keySet())) return false;
+            if(!peers.equals(heartbeatInfo.keySet())) return false;
+        Set<RaftPeer> availPeers = new HashSet<>();
+        avail.stream().forEach(peerGroups -> {
+            availPeers.add(peerGroups.getPeer());
+        });
+        if(!peers.equals(availPeers)) return false;
+        return true;
+    }
 }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java
index 3bd75a7..9c914ea 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java
@@ -51,7 +51,7 @@ public class MetadataServer extends BaseServer {
 
     private String id;
 
-    private StateMachine metaStateMachine;
+    StateMachine metaStateMachine;
 
     private LifeCycle lifeCycle;
 
@@ -82,11 +82,14 @@ public class MetadataServer extends BaseServer {
 
         // Set properties common to all log service state machines
         setRaftProperties(properties);
-
+        long failureDetectionPeriod = getConfig().
+                
getLong(Constants.LOG_SERVICE_PEER_FAILURE_DETECTION_PERIOD_KEY,
+                Constants.DEFAULT_PEER_FAILURE_DETECTION_PERIOD);
         Set<RaftPeer> peers = getPeersFromQuorum(opts.getMetaQuorum());
         RaftGroupId raftMetaGroupId = 
RaftGroupId.valueOf(opts.getMetaGroupId());
         RaftGroup metaGroup = RaftGroup.valueOf(raftMetaGroupId, peers);
-        metaStateMachine = new MetaStateMachine(raftMetaGroupId, 
RaftGroupId.valueOf(opts.getLogServerGroupId()));
+        metaStateMachine = new MetaStateMachine(raftMetaGroupId, 
RaftGroupId.valueOf(opts.getLogServerGroupId()),
+                failureDetectionPeriod);
 
         // Make sure that we aren't setting any invalid/harmful properties
         validateRaftProperties(properties);
@@ -159,4 +162,8 @@ public class MetadataServer extends BaseServer {
             return new MetadataServer(getOpts());
         }
     }
+
+    public RaftServer getServer() {
+        return server;
+    }
 }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
index 8b8356e..89b53cb 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
@@ -61,7 +61,7 @@ public class LogServiceProtoUtil {
     LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
     ChangeStateLogRequestProto changeLog =
         ChangeStateLogRequestProto.newBuilder().setLogName(logNameProto)
-            .setState(LogStreamState.valueOf(state.name())).build();
+            
.setState(LogStreamState.valueOf(state.name())).setForce(force).build();
     return 
LogServiceRequestProto.newBuilder().setChangeState(changeLog).build();
   }
 
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
index 90227b1..8384d23 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
@@ -24,6 +24,7 @@ import org.apache.ratis.logservice.api.LogName;
 import org.apache.ratis.logservice.proto.LogServiceProtos;
 import org.apache.ratis.logservice.proto.MetaServiceProtos;
 import org.apache.ratis.logservice.proto.MetaServiceProtos.*;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.ReflectionUtils;
@@ -81,6 +82,15 @@ public class MetaServiceProtoUtil {
                                 
.setPeer(MetaServiceProtoUtil.toRaftPeerProto(peer)).build()).build();
     }
 
+    public static MetaSMRequestProto toHeartbeatRequestProto(RaftPeer peer) {
+        return MetaServiceProtos.MetaSMRequestProto
+                .newBuilder()
+                .setHeartbeatRequest(
+                        MetaServiceProtos.LogServiceHeartbeatRequestProto
+                                .newBuilder()
+                                
.setPeer(MetaServiceProtoUtil.toRaftPeerProto(peer)).build()).build();
+    }
+
     public static MetaServiceRequestProto toCreateLogRequestProto(LogName 
logName) {
         LogServiceProtos.LogNameProto logNameProto = 
LogServiceProtos.LogNameProto.newBuilder()
                 .setName(logName.getName())
@@ -193,7 +203,6 @@ public class MetaServiceProtoUtil {
         };
     }
 
-
     public static DeleteLogReplyProto toDeleteLogReplyProto() {
         return DeleteLogReplyProto.newBuilder().build();
     }
diff --git a/ratis-logservice/src/main/proto/LogService.proto 
b/ratis-logservice/src/main/proto/LogService.proto
index 5c06f95..eb27842 100644
--- a/ratis-logservice/src/main/proto/LogService.proto
+++ b/ratis-logservice/src/main/proto/LogService.proto
@@ -61,6 +61,7 @@ message GetStateRequestProto {
 }
 
 message ChangeStateReplyProto {
+  LogServiceException exception = 1;
 }
 
 message GetStateReplyProto {
diff --git a/ratis-logservice/src/main/proto/MetaService.proto 
b/ratis-logservice/src/main/proto/MetaService.proto
index 16e0232..eef10e4 100644
--- a/ratis-logservice/src/main/proto/MetaService.proto
+++ b/ratis-logservice/src/main/proto/MetaService.proto
@@ -106,6 +106,13 @@ message LogServicePingRequestProto {
   RaftPeerProto peer = 1;
 }
 
+message LogServiceHeartbeatRequestProto {
+  RaftPeerProto peer = 1;
+}
+
+message LogServiceHeartbeatReplyProto {
+}
+
 // Internal StateMachine change request
 // includes: all operations with workers and raft groups.
 message MetaSMRequestProto {
@@ -113,7 +120,7 @@ message MetaSMRequestProto {
     LogServicePingRequestProto pingRequest = 1;
     LogServiceRegisterLogRequestProto registerRequest = 2;
     LogServiceUnregisterLogRequestProto unregisterRequest = 3;
-
+    LogServiceHeartbeatRequestProto heartbeatRequest = 4;
   }
 }
 
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
index 886f058..e30a8c4 100644
--- 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
@@ -27,6 +27,7 @@ import 
org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
 import org.apache.ratis.logservice.proto.MetaServiceProtos;
 import org.apache.ratis.logservice.util.LogServiceCluster;
 import org.apache.ratis.logservice.util.TestUtils;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.junit.AfterClass;
@@ -42,17 +43,15 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
 import javax.management.InstanceNotFoundException;
 import javax.management.ObjectName;
 
+import static org.junit.Assert.*;
+
 public class TestMetaServer {
 
     static LogServiceCluster cluster = null;
+    static List<LogServer> workers = null;
     static AtomicInteger createCount = new AtomicInteger();
     static AtomicInteger deleteCount = new AtomicInteger();
     static AtomicInteger listCount = new AtomicInteger();
@@ -77,7 +76,7 @@ public class TestMetaServer {
     public static void beforeClass() {
         cluster = new LogServiceCluster(3);
         cluster.createWorkers(3);
-        List<LogServer> workers = cluster.getWorkers();
+        workers = cluster.getWorkers();
         assert(workers.size() == 3);
     }
 
@@ -101,6 +100,35 @@ public class TestMetaServer {
         assertNotNull(logStream2);
     }
 
+    /**
+     * Test closing log any of the peer in .
+     * @throws IOException
+     */
+    @Test
+    public void testCloseLogOnNodeFailure() throws Exception {
+        boolean peerClosed = false;
+        try {
+            for(int i = 0; i < 5; i++) {
+                LogStream logStream1 = 
client.createLog(LogName.of("testCloseLogOnNodeFailure"+i));
+                assertNotNull(logStream1);
+            }
+            
assertTrue(((MetaStateMachine)cluster.getMasters().get(0).metaStateMachine).checkPeersAreSame());
+            workers.get(0).close();
+            peerClosed = true;
+            Thread.sleep(90000);
+            
assertTrue(((MetaStateMachine)cluster.getMasters().get(0).metaStateMachine).checkPeersAreSame());
+            for(int i = 0; i < 5; i++) {
+                LogStream logStream2 = 
client.getLog(LogName.of("testCloseLogOnNodeFailure"+i));
+                assertNotNull(logStream2);
+                assertEquals(State.CLOSED, logStream2.getState());
+            }
+        } finally {
+            if(peerClosed) {
+                // recreate the worker closed in the test.
+                cluster.createWorkers(1);
+            }
+        }
+    }
 
     @Test
     public void testReadWritetoLog() throws IOException, InterruptedException {

Reply via email to