This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c2a6a24  RATIS-588 LogStream StateMachine export
c2a6a24 is described below

commit c2a6a24a9ff85cc24132f5d5db40391bd36fbe70
Author: Ankit Singhal <[email protected]>
AuthorDate: Wed Jun 12 10:57:50 2019 -0700

    RATIS-588 LogStream StateMachine export
    
    Closes #25
    
    Signed-off-by: Josh Elser <[email protected]>
---
 ratis-logservice/pom.xml                           |  39 +++
 .../ratis/logservice/api/ArchiveLogReader.java     |  27 ++
 .../ratis/logservice/api/ArchiveLogWriter.java     |  45 +++
 .../org/apache/ratis/logservice/api/LogReader.java |   4 +-
 .../org/apache/ratis/logservice/api/LogStream.java |  24 +-
 .../ratis/logservice/client/LogServiceClient.java  | 120 ++++++-
 .../apache/ratis/logservice/common/Constants.java  |   1 +
 .../logservice/impl/ArchiveHdfsLogReader.java      | 215 ++++++++++++
 .../logservice/impl/ArchiveHdfsLogWriter.java      | 112 +++++++
 .../logservice/impl/ArchivedLogStreamImpl.java     | 138 ++++++++
 .../logservice/impl/ExportedLogStreamImpl.java     |  35 ++
 .../ratis/logservice/impl/LogStreamImpl.java       |  12 +-
 .../ratis/logservice/server/ArchivalInfo.java      |  95 ++++++
 .../apache/ratis/logservice/server/LogServer.java  |   6 +-
 .../logservice/server/LogServiceRaftLogReader.java |  14 +-
 .../ratis/logservice/server/LogStateMachine.java   | 369 +++++++++++++++++++--
 .../ratis/logservice/server/MetaStateMachine.java  |  16 +-
 .../ratis/logservice/server/RaftLogReader.java     |  51 +++
 .../ratis/logservice/shell/CommandFactory.java     |   4 +
 .../shell/commands/ArchiveLogCommand.java          |  47 +++
 .../shell/commands/ExportLogCommand.java           |  49 +++
 .../ratis/logservice/util/LogServiceProtoUtil.java |  61 +++-
 .../ratis/logservice/util/LogServiceUtils.java     |  20 ++
 .../logservice/util/MetaServiceProtoUtil.java      |   9 -
 ratis-logservice/src/main/proto/LogService.proto   |  44 ++-
 ratis-logservice/src/main/proto/MetaService.proto  |   8 -
 .../ratis/logservice/LogServiceReadWriteBase.java  |   7 +-
 .../impl/TestArchiveHdfsLogReaderAndWriter.java    | 153 +++++++++
 .../ratis/logservice/server/TestMetaServer.java    | 100 +++++-
 .../logservice/util/TestLogServiceProtoUtil.java   |   7 +-
 ratis-logservice/src/test/resources/logservice.xml |  28 ++
 .../ratis/statemachine/impl/BaseStateMachine.java  |   4 +-
 32 files changed, 1747 insertions(+), 117 deletions(-)

diff --git a/ratis-logservice/pom.xml b/ratis-logservice/pom.xml
index f180024..c1f151c 100644
--- a/ratis-logservice/pom.xml
+++ b/ratis-logservice/pom.xml
@@ -181,6 +181,14 @@
       <artifactId>ratis-netty</artifactId>
       <groupId>org.apache.ratis</groupId>
     </dependency>
+
+    <!-- Hadoop dependencies -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+
     <!-- CLI dependencies -->
     <dependency>
       <groupId>com.beust</groupId>
@@ -253,5 +261,36 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <!-- Needed for Archive log reader and writer testing-->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
   </dependencies>
 </project>
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/ArchiveLogReader.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/ArchiveLogReader.java
new file mode 100644
index 0000000..1eb1b4f
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/ArchiveLogReader.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.api;
+
+import org.apache.ratis.logservice.server.RaftLogReader;
+/**
+ * Archive Log Reader implementation. This class is not thread-safe
+ *
+ */
+public interface ArchiveLogReader extends RaftLogReader, LogReader {
+
+}
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/ArchiveLogWriter.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/ArchiveLogWriter.java
new file mode 100644
index 0000000..98038a1
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/ArchiveLogWriter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.api;
+
+import java.io.IOException;
+
+public interface ArchiveLogWriter extends LogWriter{
+
+  /**
+   * Initializes the writer
+   * @param location archival location
+   * @param logName
+   * @throws IOException
+   */
+  void init(String location, LogName logName) throws IOException;
+
+  /**
+   * Rolls writer after number of records written crosses threshold
+   * {@link 
org.apache.ratis.logservice.server.LogStateMachine#DEFAULT_ARCHIVE_THRESHOLD_PER_FILE}
+   *
+   * @throws IOException
+   */
+  void rollWriter() throws IOException;
+
+  /**
+   * Record Id of the last written record
+   * @return
+   */
+  long getLastWrittenRecordId() throws IOException;
+}
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java
index b3bb618..2119fab 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java
@@ -29,7 +29,7 @@ public interface LogReader extends AutoCloseable {
   /**
    * Seeks to the position before the record at the provided {@code recordId} 
in the LogStream.
    *
-   * @param offset A non-negative, offset in the LogStream
+   * @param recordId A non-negative, offset in the LogStream
    * @return A future for when the operation is completed.
    */
   void seek(long recordId) throws IOException;
@@ -79,7 +79,7 @@ public interface LogReader extends AutoCloseable {
   /**
    * Returns the current position of this Reader. The position is a {@code 
recordId}.
    */
-  long getPosition();
+  long getPosition() throws IOException;
 
   /**
    * Overrides {@link #close()} in {@link AutoCloseable} to throw an 
IOException.
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
index f1123af..38d4159 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
@@ -31,8 +31,26 @@ public interface LogStream extends AutoCloseable{
    * An enumeration that defines the current state of a LogStream
    */
   public enum State {
+    /*
+    Log is open to receive writes/read request
+     */
     OPEN,
-    CLOSED;
+    /*
+    Log is closed and can not be written but available for read or archival or 
export
+     */
+    CLOSED,
+    /*
+    Log is currently archiving but still readable and can be exported to a 
different locations
+     */
+    ARCHIVING,
+    /*
+    Log is archiving and available for read only
+     */
+    ARCHIVED,
+    /*
+    Log is deleted so not available for read or write or any other operation
+     */
+    DELETED;
   }
 
   /**
@@ -43,7 +61,7 @@ public interface LogStream extends AutoCloseable{
   /**
    * Returns the current state of this log.
    */
-  State getState();
+  State getState() throws IOException;
 
   /**
    * Returns the size of this LogStream in bytes.
@@ -62,7 +80,7 @@ public interface LogStream extends AutoCloseable{
    *
    * @return A synchronous reader
    */
-  LogReader createReader();
+  LogReader createReader() throws IOException;
 
   /**
    * Creates a write to write to this LogStream.
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
index 7896249..05b9292 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
@@ -26,8 +26,13 @@ import 
org.apache.ratis.logservice.api.LogServiceConfiguration;
 import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogStream.State;
 import org.apache.ratis.logservice.common.Constants;
+import org.apache.ratis.logservice.impl.ArchivedLogStreamImpl;
+import org.apache.ratis.logservice.impl.ExportedLogStreamImpl;
 import org.apache.ratis.logservice.impl.LogStreamImpl;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
 import org.apache.ratis.logservice.proto.MetaServiceProtos.*;
+import org.apache.ratis.logservice.server.ArchivalInfo;
+import org.apache.ratis.logservice.util.LogServiceProtoUtil;
 import org.apache.ratis.logservice.util.MetaServiceProtoUtil;
 import org.apache.ratis.protocol.*;
 
@@ -85,8 +90,9 @@ public class LogServiceClient implements AutoCloseable {
      */
     public LogStream createLog(LogName logName) throws IOException {
         RaftClientReply reply = client.sendReadOnly(
-                () -> 
MetaServiceProtoUtil.toCreateLogRequestProto(logName).toByteString());
-        CreateLogReplyProto message = 
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
+            () -> 
MetaServiceProtoUtil.toCreateLogRequestProto(logName).toByteString());
+        CreateLogReplyProto message =
+            CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
         if (message.hasException()) {
             throw 
MetaServiceProtoUtil.toMetaServiceException(message.getException());
         }
@@ -101,16 +107,46 @@ public class LogServiceClient implements AutoCloseable {
      * @throws IOException
      */
     public LogStream getLog(LogName logName) throws IOException {
-        RaftClientReply reply = client.sendReadOnly
-                (() -> 
MetaServiceProtoUtil.toGetLogRequestProto(logName).toByteString());
-        GetLogReplyProto message = 
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
-        if(message.hasException()) {
-            throw 
MetaServiceProtoUtil.toMetaServiceException(message.getException());
-        }
-        LogInfo info = MetaServiceProtoUtil.toLogInfo(message.getLog());
-        return new LogStreamImpl(logName, getRaftClient(info), config);
+        return new LogStreamImpl(logName, getRaftClient(getLogInfo(logName)), 
config);
+    }
+
+    /**
+     * Get Archive log .
+     * @param logName the name of the log to get
+     * @return
+     * @throws IOException
+     */
+    public LogStream getArchivedLog(LogName logName) throws IOException {
+        return new ArchivedLogStreamImpl(logName, config);
     }
 
+    /**
+     * Get exported log .
+     * @param logName the name of the log to get
+     * @param location location of the exported log
+     * @return
+     * @throws IOException
+     */
+
+    public LogStream getExportLog(LogName logName, String location) throws 
IOException {
+        return new ExportedLogStreamImpl(logName, location);
+    }
+
+    public List<ArchivalInfo> getExportStatus(LogName logName) throws 
IOException {
+        try (RaftClient client = getRaftClient(getLogInfo(logName))) {
+            RaftClientReply exportInfoReply = client.sendReadOnly(
+                () -> 
LogServiceProtoUtil.toExportInfoRequestProto(logName).toByteString());
+            LogServiceProtos.GetExportInfoReplyProto message =
+                LogServiceProtos.GetExportInfoReplyProto
+                    .parseFrom(exportInfoReply.getMessage().getContent());
+            if (message.hasException()) {
+                throw new IOException(message.getException().getErrorMsg());
+            }
+            return message.getInfoList().stream()
+                .map(infoProto -> LogServiceProtoUtil.toExportInfo(infoProto))
+                .collect(Collectors.toList());
+        }
+    }
 
     public void deleteLog(LogName logName) throws IOException {
         RaftClientReply reply = client.sendReadOnly
@@ -149,20 +185,62 @@ public class LogServiceClient implements AutoCloseable {
      * @param logInfo
      * @return
      */
-    private RaftClient getRaftClient(LogInfo logInfo) {
+    private RaftClient getRaftClient(LogInfo logInfo) throws IOException {
+
         RaftProperties properties = new RaftProperties();
         return 
RaftClient.newBuilder().setRaftGroup(logInfo.getRaftGroup()).setProperties(properties).build();
 
     }
 
+    private LogInfo getLogInfo(LogName logName) throws IOException {
+        RaftClientReply reply = client.sendReadOnly(
+            () -> 
MetaServiceProtoUtil.toGetLogRequestProto(logName).toByteString());
+        GetLogReplyProto message = 
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
+        if (message.hasException()) {
+            throw 
MetaServiceProtoUtil.toMetaServiceException(message.getException());
+        }
+        return MetaServiceProtoUtil.toLogInfo(message.getLog());
+    }
+
     /**
-     * Archives the given log out of the state machine and into a configurable 
long-term storage. A log must be
-     * in {@link State#CLOSED} to archive it.
+     * Archives the given log out of the state machine and into a configurable 
long-term storage.
+     * A log must be in {@link State#CLOSED} to archive it.
+     * Archiving of the log will happen asynchronously from the client,
+     * The call will return immediately after adding a request for archiving 
log
+     * to the respective quorum
      *
-     * @param name The name of the log to archive.
+     * Client can check the status of Archiving by calling getState() Method
+     *
+     * @param logName The name of the log to archive.
+     */
+    public void archiveLog(LogName logName) throws IOException {
+        exportLog(logName, null, 0);
+    }
+
+    /**
+     * Export the given log out of the state machine and into a provided 
location on the configured storage
+     * A log must be in {@link State#CLOSED} to export it.
+     * exporting of the log will happen asynchronously from the client,
+     * The call will return immediately after adding a request for archiving 
log
+     * to the respective quorum
+     *
+     * Client can check the status of export by calling getState() Method
+     *
+     * @param logName The name of the log to archive.
      */
-    void archiveLog(LogName name) throws IOException {
-      // TODO: write me
+    public void exportLog(LogName logName, String location, long recordId) 
throws IOException {
+        try (RaftClient client = getRaftClient(getLogInfo(logName))) {
+            RaftClientReply archiveLogReply = client.sendReadOnly(() -> 
LogServiceProtoUtil
+                .toArchiveLogRequestProto(logName, location, recordId,
+                    location == null ? true : false, 
ArchivalInfo.ArchivalStatus.SUBMITTED)
+                .toByteString());
+            LogServiceProtos.ArchiveLogReplyProto archiveMessage =
+                LogServiceProtos.ArchiveLogReplyProto
+                    .parseFrom(archiveLogReply.getMessage().getContent());
+            if (archiveMessage.hasException()) {
+                throw new 
IOException(archiveMessage.getException().getErrorMsg());
+            }
+        }
     }
 
     /**
@@ -172,8 +250,14 @@ public class LogServiceClient implements AutoCloseable {
      * @param name The name of the log to close
      */
     // TODO this name sucks, confusion WRT the Java Closeable interface.
-    void closeLog(LogName name) throws IOException {
-      //TODO: write me
+    public void closeLog(LogName name) throws IOException {
+        try (RaftClient client = getRaftClient(getLogInfo(name))) {
+            RaftClientReply reply = client.send(
+                () -> LogServiceProtoUtil.toChangeStateRequestProto(name, 
State.CLOSED)
+                    .toByteString());
+            LogServiceProtos.ChangeStateReplyProto message =
+                
LogServiceProtos.ChangeStateReplyProto.parseFrom(reply.getMessage().getContent());
+        }
     }
 
     /**
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
index 7366514..be46177 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
@@ -40,6 +40,7 @@ public class Constants {
     public static final String LOG_SERVICE_META_SERVER_GROUPID_KEY =
         "logservice.metaserver.groupid";
     public static final String LOG_SERVICE_LOG_SERVER_GROUPID_KEY = 
"logservice.logserver.groupid";
+    public static final String LOG_SERVICE_ARCHIVAL_LOCATION_KEY = 
"logservice.archival.location";
     /*
      * Raft properties
      */
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchiveHdfsLogReader.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchiveHdfsLogReader.java
new file mode 100644
index 0000000..1d5a403
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchiveHdfsLogReader.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ratis.logservice.api.ArchiveLogReader;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.util.LogServiceUtils;
+import org.apache.ratis.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArchiveHdfsLogReader implements ArchiveLogReader {
+  public static final Logger LOG = 
LoggerFactory.getLogger(ArchiveHdfsLogReader.class);
+  private long fileLength;
+  private List<FileStatus> files;
+  private FileSystem hdfs;
+  private FSDataInputStream is;
+  private byte[] currentRecord;
+  private int fileCounter = 0;
+  private int currentRecordId;
+
+  public ArchiveHdfsLogReader(String archiveLocation) throws IOException {
+    this(new Configuration(), archiveLocation);
+  }
+
+  public ArchiveHdfsLogReader(Configuration configuration, String 
archiveLocation)
+      throws IOException {
+    this.hdfs = FileSystem.get(configuration);
+    Path archiveLocationPath = new Path(archiveLocation);
+    if (!hdfs.exists(archiveLocationPath)) {
+      throw new FileNotFoundException(archiveLocation);
+    }
+    files = Arrays.asList(hdfs.listStatus(archiveLocationPath));
+    if (files.size() > 0) {
+      Collections.sort(files, new Comparator<FileStatus>() {
+        @Override public int compare(FileStatus o1, FileStatus o2) {
+          //ascending order
+          //currently written file (without _recordId_) will be sorted at the 
last
+          return LogServiceUtils.getRecordIdFromRolledArchiveFile(o1.getPath())
+              
.compareTo(LogServiceUtils.getRecordIdFromRolledArchiveFile(o2.getPath()));
+        }
+      });
+      openNextFilePath();
+      loadNext();
+    }
+  }
+
+  private Path openNextFilePath() throws IOException {
+    Path filePath = files.get(fileCounter).getPath();
+    this.is = this.hdfs.open(filePath);
+    this.fileLength = this.hdfs.getFileStatus(filePath).getLen();
+    fileCounter++;
+    return filePath;
+
+  }
+
+  @Override public void seek(long recordId) throws IOException {
+    while (currentRecordId < recordId && hasNext()) {
+      next();
+    }
+  }
+
+  @Override public boolean hasNext() throws IOException {
+    return currentRecord != null;
+  }
+
+  @Override public byte[] next() throws IOException {
+    byte[] current = currentRecord;
+    currentRecord = null;
+    if (current != null) {
+      currentRecordId++;
+    }
+    loadNext();
+    return current;
+  }
+
+  @Override public long getCurrentRaftIndex() {
+    throw new UnsupportedOperationException(
+        "getCurrentRaftIndex() is not supported for archive hdfs log reader");
+  }
+
+  @Override public ByteBuffer readNext() throws IOException {
+    byte[] current = next();
+    if (current == null) {
+      throw new NoSuchElementException();
+    }
+    return ByteBuffer.wrap(current);
+  }
+
+  private int readLength() throws IOException {
+    int length;
+    try {
+      length = is.readInt();
+    } catch (EOFException e) {
+      if (files.size() <= fileCounter) {
+        LOG.trace("EOF and no more file to read, throwing back", e);
+        throw e;
+      } else {
+        LOG.trace("EOF.. Opening next file: {}!!", 
files.get(fileCounter).getPath());
+        openNextFilePath();
+        length = is.readInt();
+      }
+    }
+    return length;
+  }
+
+  @Override public void readNext(ByteBuffer buffer) throws IOException {
+    Preconditions.checkNotNull(buffer, "buffer is NULL");
+    byte[] current = next();
+    if (current == null) {
+      throw new NoSuchElementException();
+    }
+    buffer.put(current);
+  }
+
+
+  @Override public List<ByteBuffer> readBulk(int numRecords) throws 
IOException {
+    Preconditions.checkArgument(numRecords > 0, "number of records must be 
greater than 0");
+    List<ByteBuffer> ret = new ArrayList<ByteBuffer>();
+    try {
+
+      for (int i = 0; i < numRecords; i++) {
+        ByteBuffer buffer = readNext();
+        ret.add(buffer);
+      }
+
+    } catch (EOFException eof) {
+    } catch (Exception e) {
+      throw new IOException(e);
+    } finally {
+      return ret;
+    }
+  }
+
+  @Override public int readBulk(ByteBuffer[] buffers) throws IOException {
+    Preconditions.checkNotNull(buffers, "list of buffers is NULL");
+    Preconditions.checkArgument(buffers.length > 0, "list of buffers is 
empty");
+    int count = 0;
+    try {
+      for (int i = 0; i < buffers.length; i++) {
+        readNext(buffers[i]);
+        count++;
+      }
+    } catch (EOFException eof) {
+
+    }
+    return count;
+  }
+
+  @Override public long getPosition() throws IOException {
+    return currentRecordId;
+  }
+
+  @Override public void close() throws IOException {
+    if (this.is != null) {
+      this.is.close();
+      this.is = null;
+    }
+  }
+
+  private void loadNext() throws IOException {
+    int length;
+    try {
+      length = readLength();
+    } catch (EOFException e) {
+      currentRecord = null;
+      return;
+    }
+    byte[] bytes = new byte[length];
+    if (is.read(bytes) != length) {
+      throw new EOFException(
+          "File seems to be corrupted, Encountered EOF before reading the 
complete record");
+    }
+    currentRecord = bytes;
+  }
+
+  //Only for testing
+  public List<FileStatus> getFiles(){
+    return files;
+  }
+
+}
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchiveHdfsLogWriter.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchiveHdfsLogWriter.java
new file mode 100644
index 0000000..e91cd86
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchiveHdfsLogWriter.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ratis.logservice.api.ArchiveLogWriter;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.util.LogServiceUtils;
+import org.apache.ratis.util.LogUtils;
+
+public class ArchiveHdfsLogWriter implements ArchiveLogWriter {
+  private final Configuration configuration;
+  private FileSystem hdfs;
+  private FSDataOutputStream os;
+  private Path currentPath;
+  private long currentRecordId;
+  private long lastRollRecordId;
+
+  public ArchiveHdfsLogWriter(Configuration conf) {
+    this.configuration = conf;
+  }
+
+  public ArchiveHdfsLogWriter() {
+    this.configuration = new Configuration();
+  }
+
+  @Override public void init(String archiveLocation, LogName logName) throws 
IOException {
+    hdfs = FileSystem.get(configuration);
+    Path loc = new 
Path(LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName));
+    if (!hdfs.exists(loc)) {
+      hdfs.mkdirs(loc);
+    }
+    currentPath = new Path(loc, logName.getName());
+    os = hdfs.create(currentPath, true);
+  }
+
+  @Override public long write(ByteBuffer buffer) throws IOException {
+    if (buffer.hasArray()) {
+      int startIndex = buffer.arrayOffset();
+      int curIndex = buffer.arrayOffset() + buffer.position();
+      int endIndex = curIndex + buffer.remaining();
+      int length = endIndex - startIndex;
+      os.writeInt(length);
+      os.write(buffer.array(), startIndex, length);
+    } else {
+      throw new IllegalArgumentException(
+          "Currently array backed byte buffer is only supported for archive 
write !!");
+    }
+    currentRecordId++;
+    return currentRecordId;
+  }
+
+  @Override public List<Long> write(List<ByteBuffer> records) throws 
IOException {
+    List<Long> list = new ArrayList<Long>();
+    for (ByteBuffer record : records) {
+      list.add(write(record));
+    }
+    return list;
+  }
+
+  @Override public long sync() throws IOException {
+    return 0;
+  }
+
+  @Override public void close() throws IOException {
+    os.close();
+    if (lastRollRecordId != currentRecordId) {
+      hdfs.rename(currentPath, new Path(currentPath + "_recordId_" + 
currentRecordId));
+    }
+  }
+
+  @Override public void rollWriter() throws IOException {
+    if (lastRollRecordId != currentRecordId) {
+      //close old file
+      os.close();
+      hdfs.rename(currentPath,
+          new Path(LogServiceUtils.getRolledPathForArchiveWriter(currentPath, 
currentRecordId)));
+      lastRollRecordId = currentRecordId;
+      //create new file
+      os = hdfs.create(currentPath, true);
+    }
+  }
+
+  @Override public long getLastWrittenRecordId() throws IOException {
+    os.hflush();
+    return currentRecordId;
+  }
+
+}
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchivedLogStreamImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchivedLogStreamImpl.java
new file mode 100644
index 0000000..682d6c1
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchivedLogStreamImpl.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.logservice.api.ArchiveLogReader;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogServiceConfiguration;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.api.LogWriter;
+import org.apache.ratis.logservice.api.RecordListener;
+import org.apache.ratis.logservice.common.Constants;
+import org.apache.ratis.logservice.util.LogServiceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArchivedLogStreamImpl implements LogStream {
+  public static final Logger LOG = 
LoggerFactory.getLogger(ArchivedLogStreamImpl.class);
+  /*
+   * Directory of the archived files
+   */
+  private String location;
+
+  /*
+   * Log stream name
+   */
+  LogName name;
+  /*
+   * Log stream configuration
+   */
+  LogServiceConfiguration config;
+  /*
+   * State
+   */
+  State state;
+
+
+  public ArchivedLogStreamImpl(LogName name, LogServiceConfiguration config) {
+    this(name, config.get(Constants.LOG_SERVICE_ARCHIVAL_LOCATION_KEY));
+    if(config!=null) {
+      this.config = config;
+    }
+    init();
+  }
+
+  protected ArchivedLogStreamImpl(LogName name, String location) {
+    this.name = name;
+    this.location = location;
+  }
+
+  protected void init() {
+    state = State.ARCHIVED;
+  }
+
+  @Override
+  public LogName getName() {
+    return name;
+  }
+
+  @Override public State getState() throws IOException {
+    return state;
+  }
+
+  @Override
+  public long getSize() throws IOException{
+    throw new UnsupportedOperationException("getSize()");
+  }
+
+  @Override
+  public long getLength() throws IOException {
+    throw new UnsupportedOperationException("getLength()");
+  }
+
+  @Override
+  public ArchiveLogReader createReader() throws IOException {
+    return new 
ArchiveHdfsLogReader(LogServiceUtils.getArchiveLocationForLog(location, name));
+  }
+
+  @Override
+  public LogWriter createWriter() {
+    throw new UnsupportedOperationException("Archived log cannot be written");
+  }
+
+  @Override
+  public long getLastRecordId() throws IOException {
+    throw new UnsupportedOperationException("getLastRecordId()");
+  }
+
+  @Override
+  public long getStartRecordId() throws IOException {
+    throw new UnsupportedOperationException("getStartRecordId()");
+  }
+
+  @Override public Collection<RecordListener> getRecordListeners() {
+    throw new UnsupportedOperationException("get record listeners");
+  }
+
+  @Override
+  public LogServiceConfiguration getConfiguration() {
+    return config;
+  }
+
+  @Override
+  public void close() throws Exception {
+  }
+
+  @Override public void addRecordListener(RecordListener listener) {
+    throw new UnsupportedOperationException("Add record listener");
+  }
+
+  @Override public boolean removeRecordListener(RecordListener listener) {
+    throw new UnsupportedOperationException("remove record listener");
+  }
+
+  @Override
+  public RaftClient getRaftClient() {
+    throw new UnsupportedOperationException("getRaftClient()");
+  }
+
+}
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ExportedLogStreamImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ExportedLogStreamImpl.java
new file mode 100644
index 0000000..ae9a7c2
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ExportedLogStreamImpl.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExportedLogStreamImpl extends ArchivedLogStreamImpl {
+  public static final Logger LOG = 
LoggerFactory.getLogger(ExportedLogStreamImpl.class);
+
+  public ExportedLogStreamImpl(LogName name, String location) {
+    super(name, location);
+  }
+
+  @Override
+  protected void init() {
+    state = State.CLOSED;
+  }
+}
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
index 3394ea6..655fc95 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
@@ -30,6 +30,7 @@ import 
org.apache.ratis.logservice.api.LogServiceConfiguration;
 import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogWriter;
 import org.apache.ratis.logservice.api.RecordListener;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
 import 
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLastCommittedIndexReplyProto;
 import 
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLengthReplyProto;
 import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogSizeReplyProto;
@@ -96,9 +97,12 @@ public class LogStreamImpl implements LogStream {
     return name;
   }
 
-  @Override
-  public State getState() {
-    return state;
+  @Override public State getState() throws IOException {
+    RaftClientReply reply = raftClient.sendReadOnly(
+        
Message.valueOf(LogServiceProtoUtil.toGetStateRequestProto(name).toByteString()));
+    LogServiceProtos.GetStateReplyProto proto =
+        
LogServiceProtos.GetStateReplyProto.parseFrom(reply.getMessage().getContent());
+    return State.valueOf(proto.getState().name());
   }
 
   @Override
@@ -130,7 +134,7 @@ public class LogStreamImpl implements LogStream {
   }
 
   @Override
-  public LogReader createReader() {
+  public LogReader createReader() throws IOException{
     return new LogReaderImpl(this);
   }
 
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ArchivalInfo.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ArchivalInfo.java
new file mode 100644
index 0000000..0c2b460
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ArchivalInfo.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.server;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
+import org.apache.ratis.logservice.util.LogServiceProtoUtil;
+
+public class ArchivalInfo {
+
+  public enum ArchivalStatus{
+    /*
+    Initial state when the archival/export request is submitted and request
+    is recorded at the leader but the single thread responsible for archiving
+    has not started processing it
+     */
+    SUBMITTED,
+    /*
+    Archiving/exporting of the particular request has been started and
+    file will appear soon in archival location during this state
+     */
+    STARTED,
+    /*
+    Archival is ongoing and at least one file is rolled as well
+     */
+    RUNNING,
+    /*
+    Archiving on the current leader will get interrupted
+     when it become a follower after re-election.
+     and a request to new leader will be submitted after some delay
+     to avoid leader election storm, if a new request fails , archival
+     status will be changed to FAILED and log state back to CLOSED so that
+     user can submit request again
+     */
+    INTERRUPTED,
+    /*
+    Archival/export request is successfully completed
+     */
+    COMPLETED,
+    /*
+    Archival/export request is failed due to the error,
+    worker logs should have trace for it
+    After fixing the issue , archival request can be resubmitted
+     */
+    FAILED
+  }
+  private String archiveLocation;
+  private LogName archiveLogName;
+  private long lastArchivedIndex;
+  private ArchivalStatus status;
+
+  public ArchivalInfo(String location) {
+    this.archiveLocation = location;
+  }
+
+
+  public ArchivalInfo 
updateArchivalInfo(LogServiceProtos.ArchiveLogRequestProto archiveLog) {
+    this.archiveLogName = 
LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+    this.lastArchivedIndex = archiveLog.getLastArchivedRaftIndex();
+    this.status = ArchivalStatus.valueOf(archiveLog.getStatus().name());
+    return this;
+  }
+
+  public String getArchiveLocation() {
+    return archiveLocation;
+  }
+
+  public LogName getArchiveLogName() {
+    return archiveLogName;
+  }
+
+  public long getLastArchivedIndex() {
+    return lastArchivedIndex;
+  }
+
+  public ArchivalStatus getStatus(){
+    return status;
+  }
+
+}
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
index 1596e72..50da769 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
@@ -80,6 +80,10 @@ public class LogServer extends BaseServer {
       long segmentSize = 
getConfig().getLong(Constants.RATIS_RAFT_SEGMENT_SIZE_KEY,
         Constants.DEFAULT_RATIS_RAFT_SEGMENT_SIZE);
       SizeInBytes segmentSizeBytes = SizeInBytes.valueOf(segmentSize);
+      String archiveLocation = 
getConfig().get(Constants.LOG_SERVICE_ARCHIVAL_LOCATION_KEY);
+      if (archiveLocation != null) {
+        properties.set(Constants.LOG_SERVICE_ARCHIVAL_LOCATION_KEY, 
archiveLocation);
+      }
       RaftServerConfigKeys.Log.setSegmentSizeMax(properties, segmentSizeBytes);
       RaftServerConfigKeys.Log.setPreallocatedSize(properties, 
segmentSizeBytes);
 
@@ -117,7 +121,7 @@ public class LogServer extends BaseServer {
                         if(raftGroupId.equals(logServerGroupId)) {
                             return new ManagementStateMachine();
                         }
-                        return new LogStateMachine();
+                        return new LogStateMachine(properties);
                     }
                 })
                 .setProperties(properties)
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
index 8c6e00d..b3b85ec 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
  * A reader for the {@link RaftLog} which is accessed using LogService 
recordId's instead
  * of Raft log indexes. Not thread-safe.
  */
-public class LogServiceRaftLogReader {
+public class LogServiceRaftLogReader implements  RaftLogReader{
   private static final Logger LOG = 
LoggerFactory.getLogger(LogServiceRaftLogReader.class);
   private final RaftLog raftLog;
 
@@ -55,6 +55,7 @@ public class LogServiceRaftLogReader {
    * Positions this reader just before the current recordId. Use {@link 
#next()} to get that
    * element, but take care to check if a value is present using {@link 
#hasNext()} first.
    */
+  @Override
   public void seek(long recordId) throws RaftLogIOException, 
InvalidProtocolBufferException {
     LOG.trace("Seeking to recordId={}", recordId);
     // RaftLog starting index
@@ -75,6 +76,7 @@ public class LogServiceRaftLogReader {
   /**
    * Returns true if there is a log entry to read.
    */
+  @Override
   public boolean hasNext() throws RaftLogIOException, 
InvalidProtocolBufferException {
     return currentRecord != null;
   }
@@ -83,14 +85,15 @@ public class LogServiceRaftLogReader {
    * Returns the next log entry. Ensure {@link #hasNext()} returns true before
    * calling this method.
    */
-  public ByteString next() throws RaftLogIOException, 
InvalidProtocolBufferException {
+  @Override
+  public byte[] next() throws RaftLogIOException, 
InvalidProtocolBufferException {
     if (currentRecord == null) {
       throw new NoSuchElementException();
     }
     ByteString current = currentRecord;
     currentRecord = null;
     loadNext();
-    return current;
+    return current.toByteArray();
   }
 
   /**
@@ -159,4 +162,9 @@ public class LogServiceRaftLogReader {
     }
     // If we make it here, we've read off the end of the RaftLog.
   }
+
+  @Override
+  public long getCurrentRaftIndex(){
+    return currentRaftIndex;
+  }
 }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index 9185641..6b631a0 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ratis.logservice.server;
 
+import static org.apache.ratis.logservice.api.LogStream.State;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -25,29 +27,49 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
-import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.logservice.api.ArchiveLogWriter;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.common.Constants;
+import org.apache.ratis.logservice.impl.ArchiveHdfsLogReader;
+import org.apache.ratis.logservice.impl.ArchiveHdfsLogWriter;
 import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
 import 
org.apache.ratis.logservice.proto.LogServiceProtos.AppendLogEntryRequestProto;
-import org.apache.ratis.logservice.proto.LogServiceProtos.CloseLogReplyProto;
-import org.apache.ratis.logservice.proto.LogServiceProtos.CloseLogRequestProto;
 import 
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLengthRequestProto;
 import 
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogSizeRequestProto;
 import org.apache.ratis.logservice.proto.LogServiceProtos.GetStateRequestProto;
 import 
org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto;
 import org.apache.ratis.logservice.proto.LogServiceProtos.ReadLogRequestProto;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
+import org.apache.ratis.logservice.util.LogServiceUtils;
 import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.logservice.server.ArchivalInfo.ArchivalStatus;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.impl.ServerState;
@@ -66,6 +88,8 @@ import org.slf4j.LoggerFactory;
 
 public class LogStateMachine extends BaseStateMachine {
   public static final Logger LOG = 
LoggerFactory.getLogger(LogStateMachine.class);
+  public static final long DEFAULT_ARCHIVE_THRESHOLD_PER_FILE = 1000000;
+  private final RaftProperties properties;
   private RatisMetricRegistry metricRegistry;
   private Timer sizeRequestTimer;
   private Timer readNextQueryTimer;
@@ -75,12 +99,10 @@ public class LogStateMachine extends BaseStateMachine {
   private Timer startIndexTimer;
   private Timer appendRequestTimer;
   private Timer syncRequesTimer;
+  private Timer archiveLogRequestTimer;
   private Timer getCloseLogTimer;
-
-  public static enum State {
-    OPEN, CLOSED
-  }
-
+  private RaftClient client;
+  //Archival information
   /*
    *  State is a log's length, size, and state (closed/open);
    */
@@ -99,9 +121,18 @@ public class LogStateMachine extends BaseStateMachine {
 
   private RaftLog log;
 
-  private RaftGroupId groupId;
 
   private RaftServerProxy proxy ;
+  private ExecutorService executorService;
+  private boolean isArchivalRequest;
+  private ArchivalInfo archivalInfo;
+  private Map<String,ArchivalInfo> exportMap = new 
HashMap<String,ArchivalInfo>();
+  private Map<String, Future<Boolean>> archiveExportFutures = new HashMap<>();
+  private Timer archiveLogTimer;
+
+  public LogStateMachine(RaftProperties properties) {
+    this.properties = properties;
+  }
 
   private AutoCloseableLock readLock() {
     return AutoCloseableLock.acquire(lock.readLock());
@@ -126,7 +157,6 @@ public class LogStateMachine extends BaseStateMachine {
     super.initialize(server, groupId, raftStorage);
     this.storage.init(raftStorage);
     this.proxy = (RaftServerProxy) server;
-    this.groupId = groupId;
     //TODO: using groupId for metric now but better to tag it with LogName
     this.metricRegistry = LogServiceMetricsRegistry
         .createMetricRegistryForLogService(groupId.toString(), 
server.getId().toString());
@@ -139,7 +169,15 @@ public class LogStateMachine extends BaseStateMachine {
     this.syncRequesTimer = metricRegistry.timer("syncRequesTime");
     this.appendRequestTimer = metricRegistry.timer("appendRequestTime");
     this.getCloseLogTimer = metricRegistry.timer("getCloseLogTime");
+    //archiving request time not the actual archiving time
+    this.archiveLogRequestTimer = 
metricRegistry.timer("archiveLogRequestTime");
+    this.archiveLogTimer = metricRegistry.timer("archiveLogTime");
     loadSnapshot(storage.getLatestSnapshot());
+    executorService = Executors.newSingleThreadExecutor();
+    this.archivalInfo =
+        new 
ArchivalInfo(properties.get(Constants.LOG_SERVICE_ARCHIVAL_LOCATION_KEY));
+
+
   }
 
   private void checkInitialization() throws IOException {
@@ -266,7 +304,14 @@ public class LogStateMachine extends BaseStateMachine {
               return processGetLengthRequest(logServiceRequestProto);
             }
           });
-        default:
+      case ARCHIVELOG:
+        return recordTime(archiveLogRequestTimer, new Task(){
+          @Override public CompletableFuture<Message> run() {
+            return processArchiveLog(logServiceRequestProto);
+          }});
+      case EXPORTINFO:
+        return processExportInfo(logServiceRequestProto);
+      default:
           // TODO
           throw new RuntimeException(
             "Wrong message type for query: " + 
logServiceRequestProto.getRequestCase());
@@ -279,6 +324,19 @@ public class LogStateMachine extends BaseStateMachine {
 
   }
 
+  private CompletableFuture<Message> processExportInfo(
+      LogServiceRequestProto logServiceRequestProto) {
+    LogServiceProtos.GetExportInfoRequestProto exportInfoRequestProto =
+        logServiceRequestProto.getExportInfo();
+    LogServiceProtos.GetExportInfoReplyProto.Builder exportBuilder =
+        LogServiceProtos.GetExportInfoReplyProto.newBuilder();
+    exportMap.values().stream().map(
+        archivalInfo -> 
exportBuilder.addInfo(LogServiceProtoUtil.toExportInfoProto(archivalInfo)))
+        .collect(Collectors.toList());
+
+    return 
CompletableFuture.completedFuture(Message.valueOf(exportBuilder.build().toByteString()));
+  }
+
   /**
    * Process get start index request
    * @param proto message
@@ -302,7 +360,6 @@ public class LogStateMachine extends BaseStateMachine {
   private CompletableFuture<Message>
       processGetLastCommittedIndexRequest(LogServiceRequestProto proto)
   {
-
     Throwable t = verifyState(State.OPEN);
     long lastIndex = log.getLastCommittedIndex();
     return CompletableFuture.completedFuture(Message
@@ -340,19 +397,34 @@ public class LogStateMachine extends BaseStateMachine {
     long startRecordId = msgProto.getStartRecordId();
     // And the number of records they want to read
     int numRecordsToRead = msgProto.getNumRecords();
-    Throwable t = verifyState(State.OPEN);
+    //Log must have been closed while Archiving , so we can let user only to
+    // read when the log is either OPEN or ARCHIVED
+    Throwable t = verifyState(State.OPEN, State.ARCHIVING, State.CLOSED, 
State.ARCHIVED);
     List<byte[]> list = null;
 
     if (t == null) {
-      LogServiceRaftLogReader reader = new LogServiceRaftLogReader(log);
-      list = new ArrayList<byte[]>();
+      RaftLogReader reader = null;
       try {
-        reader.seek(startRecordId);
-        for (int i = 0; i < numRecordsToRead; i++) {
-          if (!reader.hasNext()) {
-            break;
+        if (this.state == State.OPEN || this.state == State.CLOSED
+            || this.state == State.ARCHIVING) {
+          reader = new LogServiceRaftLogReader(log);
+        } else if (this.state == State.ARCHIVED) {
+          reader = new ArchiveHdfsLogReader(LogServiceUtils
+              .getArchiveLocationForLog(archivalInfo.getArchiveLocation(),
+                  archivalInfo.getArchiveLogName()));
+        } else {
+          //could be a race condition
+          t = verifyState(State.OPEN, State.ARCHIVED);
+        }
+        if (t == null && reader != null) {
+          list = new ArrayList<byte[]>();
+          reader.seek(startRecordId);
+          for (int i = 0; i < numRecordsToRead; i++) {
+            if (!reader.hasNext()) {
+              break;
+            }
+            list.add(reader.next());
           }
-          list.add(reader.next().toByteArray());
         }
       } catch (Exception e) {
         LOG.error("Failed to execute ReadNextQuery", e);
@@ -425,10 +497,10 @@ public class LogStateMachine extends BaseStateMachine {
       LogServiceRequestProto logServiceRequestProto =
           
LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
       switch (logServiceRequestProto.getRequestCase()) {
-        case CLOSELOG:
+      case CHANGESTATE:
           return recordTime(getCloseLogTimer, new Task(){
             @Override public CompletableFuture<Message> run() {
-              return processCloseLog(logServiceRequestProto);
+              return processChangeState(logServiceRequestProto);
             }});
         case APPENDREQUEST:
           return recordTime(appendRequestTimer, new Task(){
@@ -440,6 +512,8 @@ public class LogStateMachine extends BaseStateMachine {
             @Override public CompletableFuture<Message> run() {
               return processSyncRequest(trx, logServiceRequestProto);
             }});
+        case ARCHIVELOG:
+          return updateArchiveLogInfo(logServiceRequestProto);
         default:
           //TODO
           return null;
@@ -452,29 +526,256 @@ public class LogStateMachine extends BaseStateMachine {
 
 
 
-  private CompletableFuture<Message> processCloseLog(LogServiceRequestProto 
logServiceRequestProto) {
-    CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
+  private CompletableFuture<Message> processChangeState(LogServiceRequestProto 
logServiceRequestProto) {
+    LogServiceProtos.ChangeStateLogRequestProto changeState = 
logServiceRequestProto.getChangeState();
     // Need to check whether the file is opened if opened close it.
     // TODO need to handle exceptions while operating with files.
+
+    State targetState = State.valueOf(changeState.getState().name());
+    //if forced skip checking states
+    if(!changeState.getForce()) {
+      switch (targetState) {
+      case OPEN:
+        if (state != null) {
+          verifyState(State.OPEN, State.CLOSED);
+        }
+        break;
+      case CLOSED:
+        verifyState(State.OPEN);
+        break;
+      case ARCHIVED:
+        verifyState(State.ARCHIVING);
+        break;
+      case ARCHIVING:
+        verifyState(State.CLOSED);
+        break;
+      case DELETED:
+        verifyState(State.CLOSED);
+        break;
+      }
+    }
+    this.state = targetState;
     return CompletableFuture.completedFuture(Message
-      .valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
+        
.valueOf(LogServiceProtos.ChangeStateReplyProto.newBuilder().build().toByteString()));
   }
 
-
-
   private CompletableFuture<Message> processGetStateRequest(
       LogServiceRequestProto logServiceRequestProto) {
     GetStateRequestProto getState = logServiceRequestProto.getGetState();
-    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-        .toGetStateReplyProto(state == State.OPEN).toByteString()));
+    return CompletableFuture.completedFuture(Message
+        
.valueOf(LogServiceProtoUtil.toGetStateReplyProto(state).toByteString()));
   }
 
-  private Throwable verifyState(State state) {
-       if (this.state != state) {
-          return new IOException("Wrong state: " + this.state);
-        }
+  private Throwable verifyState(State... states) {
+    for (State state : states) {
+      if (this.state == state) {
         return null;
-   }
+      }
+    }
+    return new IOException("Wrong state: " + this.state);
+  }
 
+  private CompletableFuture<Message> updateArchiveLogInfo(
+      LogServiceRequestProto logServiceRequestProto) {
+    LogServiceProtos.ArchiveLogRequestProto archiveLog = 
logServiceRequestProto.getArchiveLog();
+    this.isArchivalRequest = !archiveLog.getIsExport();
+    Throwable t = null;
+    if(isArchivalRequest) {
+      archivalInfo.updateArchivalInfo(archiveLog);
+      t = verifyState(State.ARCHIVING);
+    }else{
+      t = verifyState(State.OPEN, State.CLOSED);
+      ArchivalInfo info = exportMap.get(archiveLog.getLocation());
+      if(info==null) {
+        info = new ArchivalInfo(archiveLog.getLocation());
+        exportMap.put(archiveLog.getLocation(),info);
+      }
+      info.updateArchivalInfo(archiveLog);
+    }
+    return CompletableFuture.completedFuture(
+        
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
+  }
+
+  private CompletableFuture<Message> processArchiveLog(
+      LogServiceRequestProto logServiceRequestProto) {
+    LogServiceProtos.ArchiveLogRequestProto archiveLog = 
logServiceRequestProto.getArchiveLog();
+    LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+    Throwable t = null;
+    try {
+      String loc = null;
+      this.isArchivalRequest = !archiveLog.getIsExport();
+      if (isArchivalRequest) {
+        loc = archivalInfo.getArchiveLocation();
+        archivalInfo.updateArchivalInfo(archiveLog);
+      } else {
+        loc = archiveLog.getLocation();
+        ArchivalInfo exportInfo =
+            exportMap.putIfAbsent(loc, new ArchivalInfo(loc));
+        if (exportInfo != null && exportInfo.getLastArchivedIndex() == 
archiveLog
+            .getLastArchivedRaftIndex()) {
+          throw new IllegalStateException("Export of " + logName + "for the 
given location " + loc
+              + "is already present and in " + exportInfo.getStatus());
+        }
+        exportInfo.updateArchivalInfo(archiveLog);
+      }
+      if (loc == null) {
+        throw new IllegalArgumentException(isArchivalRequest ?
+            "Location for archive is not configured" :
+            "Location for export provided is null");
+      }
+      final String location = loc;
+      long recordId = archiveLog.getLastArchivedRaftIndex();
+      if (isArchivalRequest) {
+        t = verifyState(State.CLOSED);
+      } else {
+        t = verifyState(State.OPEN, State.CLOSED);
+      }
+      if (t == null) {
+        Callable<Boolean> callable = () -> {
+          final Timer.Context timerContext = archiveLogTimer.time();
+          try {
+            startArchival(recordId, logName, location);
+            //Init ArchiveLogWriter for writing in export/archival location
+            ArchiveLogWriter writer = new ArchiveHdfsLogWriter();
+            writer.init(location, logName);
+
+            LogServiceRaftLogReader reader = new LogServiceRaftLogReader(log);
+            reader.seek(recordId);
+            long records = 0;
+            boolean isInterrupted = false;
+            while (reader.hasNext()) {
+              writer.write(ByteBuffer.wrap(reader.next()));
+              isInterrupted = Thread.currentThread().isInterrupted();
+              if (records >= DEFAULT_ARCHIVE_THRESHOLD_PER_FILE || 
isInterrupted) {
+                //roll writer when interuppted or no. of records threshold per 
file is met
+                commit(writer, logName, location);
+                if (isInterrupted) {
+                  break;
+                }
+                records = 0;
+              }
+              records++;
+            }
+            writer.close();
+            if (!isInterrupted) {
+              //It means archival is successfully completed on this leader
+              completeArchival(writer.getLastWrittenRecordId(), logName, 
location);
+            } else {
+              //Thread is interuppted either leader is going down or it become 
follower
+              try {
+                //Sleeping here before sending archival request to new leader 
to
+                // avoid causing problem during leader election storm
+                Thread.sleep(10000);
+              } catch (InterruptedException e) {
+              }
+              
sendArchiveLogrequestToNewLeader(writer.getLastWrittenRecordId(), logName, 
location);
+            }
+            return true;
+          } catch (Exception e) {
+            LOG.error("Archival failed for the log:" + logName, e);
+            failArchival(recordId, logName, location);
+          } finally {
+            timerContext.stop();
+          }
+          return false;
+        };
+
+        archiveExportFutures.put(location, executorService.submit(callable));
+      }
+    }catch (Exception e){
+      LOG.warn("Exception while processing archival request for " + logName, 
e);
+      t = e;
+    }
+    return CompletableFuture.completedFuture(
+        
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
+  }
+
+  private void failArchival(long recordId, LogName logName, String location) 
throws IOException {
+    updateArchivingInfo(recordId, logName, location, isArchivalRequest,
+        ArchivalStatus.FAILED);
+    if(isArchivalRequest) {
+      sendChangeStateRequest(State.CLOSED, true);
+    }
+  }
+
+  private void startArchival(long recordId, LogName logName, String location) 
throws IOException {
+    if (isArchivalRequest) {
+      sendChangeStateRequest(State.ARCHIVING, false);
+    }
+    updateArchivingInfo(recordId, logName, location, isArchivalRequest, 
ArchivalStatus.STARTED);
+  }
+
+  private void sendArchiveLogrequestToNewLeader(long recordId, LogName 
logName, String location)
+      throws IOException {
+    getClient().sendReadOnly(() -> LogServiceProtoUtil
+        .toArchiveLogRequestProto(logName, location, recordId, 
isArchivalRequest,
+            ArchivalStatus.INTERRUPTED).toByteString());
+  }
+
+  public void completeArchival(long recordId, LogName logName, String 
location) throws IOException {
+    if (isArchivalRequest) {
+      sendChangeStateRequest(State.ARCHIVED, false);
+    }
+    updateArchivingInfo(recordId, logName, location, isArchivalRequest, 
ArchivalStatus.COMPLETED);
+  }
+
+  private void commit(ArchiveLogWriter writer, LogName logName, String 
location)
+      throws IOException {
+    writer.rollWriter();
+    updateArchivingInfo(writer.getLastWrittenRecordId(), logName, location, 
isArchivalRequest,
+        ArchivalStatus.RUNNING);
+  }
+
+  private void updateArchivingInfo(long recordId, LogName logName, String 
location,
+      boolean isArchival, ArchivalStatus status)
+      throws IOException {
+    RaftClientReply archiveLogReply = getClient().send(() -> 
LogServiceProtoUtil
+        .toArchiveLogRequestProto(logName, location, recordId, isArchival, 
status).toByteString());
+    LogServiceProtos.ArchiveLogReplyProto 
message=LogServiceProtos.ArchiveLogReplyProto
+        .parseFrom(archiveLogReply.getMessage().getContent());
+    if (message.hasException()) {
+      throw new IOException(message.getException().getErrorMsg());
+    }
+  }
+
+  private void sendChangeStateRequest(State state, boolean force) throws 
IOException {
+      getClient().send(
+          () -> 
LogServiceProtoUtil.toChangeStateRequestProto(LogName.of("Dummy"), state, force)
+              .toByteString());
+  }
+
+  private RaftClient getClient() throws IOException {
+    if (client == null) {
+      try {
+        RaftServer raftServer = server.get();
+        client = 
RaftClient.newBuilder().setRaftGroup(getGroupFromGroupId(raftServer, groupId))
+            .setClientId(ClientId.randomId())
+            .setProperties(raftServer.getProperties()).build();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+    return client;
+  }
+
+  private RaftGroup getGroupFromGroupId(RaftServer raftServer, RaftGroupId 
raftGroupId)
+      throws IOException {
+    List<RaftGroup> x = 
StreamSupport.stream(raftServer.getGroups().spliterator(), false)
+        .filter(group -> 
group.getGroupId().equals(raftGroupId)).collect(Collectors.toList());
+    if (x.size() == 1) {
+      return x.get(0);
+    } else {
+      throw new IOException(x.size() + " are group found for group id:" + 
raftGroupId);
+    }
+  }
+
+  @Override public void notifyNotLeader(Collection<TransactionContext> 
pendingEntries)
+      throws IOException {
+    for(Future<Boolean> archiveFuture:archiveExportFutures.values()) {
+      if (archiveFuture != null && !archiveFuture.isCancelled()) {
+        archiveFuture.cancel(true);
+      }
+    }
+  }
 
 }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index 7caa63a..5e0785b 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -42,8 +42,6 @@ import 
org.apache.ratis.logservice.common.LogNotFoundException;
 import org.apache.ratis.logservice.common.NoEnoughWorkersException;
 import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
 import org.apache.ratis.logservice.proto.MetaServiceProtos;
-import 
org.apache.ratis.logservice.proto.MetaServiceProtos.ArchiveLogReplyProto;
-import 
org.apache.ratis.logservice.proto.MetaServiceProtos.ArchiveLogRequestProto;
 import 
org.apache.ratis.logservice.proto.MetaServiceProtos.CreateLogRequestProto;
 import 
org.apache.ratis.logservice.proto.MetaServiceProtos.DeleteLogRequestProto;
 import 
org.apache.ratis.logservice.proto.MetaServiceProtos.LogServicePingRequestProto;
@@ -211,8 +209,6 @@ public class MetaStateMachine extends BaseStateMachine {
                 return processListLogsRequest();
             case GETLOG:
                 return processGetLogRequest(req);
-            case ARCHIVELOG:
-                return processArchiveLog(req);
             case DELETELOG:
                 return processDeleteLog(req);
             default:
@@ -263,6 +259,8 @@ public class MetaStateMachine extends BaseStateMachine {
                                         
.setLogname(LogServiceProtoUtil.toLogNameProto(logName)))
                         .build().toByteString());
             } catch (IOException e) {
+                LOG.error(
+                    "Exception while unregistring raft group with Metadata 
Service during deletion of log");
                 e.printStackTrace();
             }
         }
@@ -279,14 +277,6 @@ public class MetaStateMachine extends BaseStateMachine {
 //                
.valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
 //    }
 
-    private CompletableFuture<Message>
-    processArchiveLog(MetaServiceProtos.MetaServiceRequestProto 
logServiceRequestProto) {
-        ArchiveLogRequestProto archiveLog = 
logServiceRequestProto.getArchiveLog();
-        LogName logName = 
LogServiceProtoUtil.toLogName(archiveLog.getLogName());
-        // Handle log archiving.
-        return CompletableFuture.completedFuture(Message
-                
.valueOf(ArchiveLogReplyProto.newBuilder().build().toByteString()));
-    }
 
 //    private CompletableFuture<Message> processGetStateRequest(
 //            MetaServiceProtos.MetaServiceRequestProto 
logServiceRequestProto) {
@@ -345,6 +335,8 @@ public class MetaStateMachine extends BaseStateMachine {
                                             .toRaftGroupProto(raftGroup)))
                             .build().toByteString());
                 } catch (IOException e) {
+                    LOG.error(
+                        "Exception while registring raft group with Metadata 
Service during creation of log");
                     e.printStackTrace();
                 }
                 return 
CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/RaftLogReader.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/RaftLogReader.java
new file mode 100644
index 0000000..5aa6672
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/RaftLogReader.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.server;
+
+import java.io.IOException;
+
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+public interface RaftLogReader {
+
+  /**
+   * Positions this reader just before the current recordId. Use {@link 
#next()} to get that
+   * element, but take care to check if a value is present using {@link 
#hasNext()} first.
+   */
+  public void seek(long recordId) throws IOException;
+
+  /**
+   * Returns true if there is a log entry to read.
+   */
+  public boolean hasNext() throws IOException;
+
+  /**
+   * Returns the next log entry. Ensure {@link #hasNext()} returns true before
+   * calling this method.
+   */
+  public byte[] next() throws IOException;
+
+  /**
+   * Returns current raft index read
+   * @return
+   */
+  public long getCurrentRaftIndex();
+
+  }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/CommandFactory.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/CommandFactory.java
index 873cf6b..319857d 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/CommandFactory.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/CommandFactory.java
@@ -22,9 +22,11 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import org.apache.ratis.logservice.shell.commands.ArchiveLogCommand;
 import org.apache.ratis.logservice.shell.commands.CreateLogCommand;
 import org.apache.ratis.logservice.shell.commands.DeleteLogCommand;
 import org.apache.ratis.logservice.shell.commands.ExitCommand;
+import org.apache.ratis.logservice.shell.commands.ExportLogCommand;
 import org.apache.ratis.logservice.shell.commands.HelpCommand;
 import org.apache.ratis.logservice.shell.commands.ListLogsCommand;
 import org.apache.ratis.logservice.shell.commands.PutToLogCommand;
@@ -46,6 +48,8 @@ public class CommandFactory {
     commands.put("quit", exitCommand);
     commands.put("help", new HelpCommand());
     commands.put("list", new ListLogsCommand());
+    commands.put("archive", new ArchiveLogCommand());
+    commands.put("export", new ExportLogCommand());
 
     return Collections.unmodifiableMap(commands);
   }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/commands/ArchiveLogCommand.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/commands/ArchiveLogCommand.java
new file mode 100644
index 0000000..11e60b4
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/commands/ArchiveLogCommand.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.shell.commands;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.client.LogServiceClient;
+import org.apache.ratis.logservice.shell.Command;
+import org.jline.reader.LineReader;
+import org.jline.terminal.Terminal;
+
+public class ArchiveLogCommand implements Command {
+
+  @Override public String getHelpMessage() {
+    return "`archive` - archive the given log at already configured location";
+  }
+
+  @Override
+  public void run(Terminal terminal, LineReader lineReader, LogServiceClient 
client, String[] args) {
+    if (args.length != 1) {
+      terminal.writer().println("ERROR - Usage: archive <name> ");
+      return;
+    }
+    String logName = args[0];
+    try {
+      client.archiveLog(LogName.of(logName));
+      terminal.writer().println("Archive Log request is submitted 
successfully!!");
+    } catch (Exception e) {
+      terminal.writer().println("Failed to archive log!!");
+      e.printStackTrace(terminal.writer());
+    }
+  }
+}
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/commands/ExportLogCommand.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/commands/ExportLogCommand.java
new file mode 100644
index 0000000..11c1df7
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/commands/ExportLogCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.shell.commands;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.client.LogServiceClient;
+import org.apache.ratis.logservice.shell.Command;
+import org.jline.reader.LineReader;
+import org.jline.terminal.Terminal;
+
+public class ExportLogCommand implements Command {
+
+  @Override public String getHelpMessage() {
+    return "`export` - export the given log at given location starting from 
provided recordId";
+  }
+
+  @Override
+  public void run(Terminal terminal, LineReader lineReader, LogServiceClient 
client, String[] args) {
+    if (args.length != 3) {
+      terminal.writer().println("ERROR - Usage: export <name> <location> 
<recordId>");
+      return;
+    }
+    String logName = args[0];
+    String location = args[1];
+    long recordId = Long.parseLong(args[2]);
+    try {
+      client.exportLog(LogName.of(logName), location,recordId);
+      terminal.writer().println("Export Log request is submitted 
successfully!!");
+    } catch (Exception e) {
+      terminal.writer().println("Failed to export log!!");
+      e.printStackTrace(terminal.writer());
+    }
+  }
+}
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
index 2cc4c58..8b8356e 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
@@ -29,7 +29,7 @@ import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogStream.State;
 import org.apache.ratis.logservice.proto.LogServiceProtos;
 import org.apache.ratis.logservice.proto.LogServiceProtos.*;
-import org.apache.ratis.logservice.proto.MetaServiceProtos.*;
+import org.apache.ratis.logservice.server.ArchivalInfo;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 public class LogServiceProtoUtil {
@@ -56,11 +56,17 @@ public class LogServiceProtoUtil {
     return logStreamProto;
   }
 
-  public static LogServiceRequestProto toCloseLogRequestProto(LogName logName) 
{
+  public static LogServiceRequestProto toChangeStateRequestProto(LogName 
logName, State state,
+      boolean force) {
     LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
-    CloseLogRequestProto closeLog =
-        CloseLogRequestProto.newBuilder().setLogName(logNameProto).build();
-    return LogServiceRequestProto.newBuilder().setCloseLog(closeLog).build();
+    ChangeStateLogRequestProto changeLog =
+        ChangeStateLogRequestProto.newBuilder().setLogName(logNameProto)
+            .setState(LogStreamState.valueOf(state.name())).build();
+    return 
LogServiceRequestProto.newBuilder().setChangeState(changeLog).build();
+  }
+
+  public static LogServiceRequestProto toChangeStateRequestProto(LogName 
logName, State state) {
+    return toChangeStateRequestProto(logName, state, false);
   }
 
   public static LogServiceRequestProto toGetStateRequestProto(LogName logName) 
{
@@ -70,8 +76,11 @@ public class LogServiceProtoUtil {
     return LogServiceRequestProto.newBuilder().setGetState(getState).build();
   }
 
-  public static ArchiveLogReplyProto toArchiveLogReplyProto() {
+  public static ArchiveLogReplyProto toArchiveLogReplyProto(Throwable t) {
     ArchiveLogReplyProto.Builder builder = ArchiveLogReplyProto.newBuilder();
+    if (t != null) {
+      builder.setException(toLogException(t));
+    }
     return builder.build();
   }
 
@@ -163,11 +172,10 @@ public class LogServiceProtoUtil {
     return retVal;
   }
 
-  public static GetStateReplyProto toGetStateReplyProto(boolean exists) {
-    return GetStateReplyProto.newBuilder().build();
+  public static GetStateReplyProto toGetStateReplyProto(State state) {
+    return 
GetStateReplyProto.newBuilder().setState(LogStreamState.valueOf(state.name())).build();
   }
 
-
   public static GetLogLengthReplyProto toGetLogLengthReplyProto(long length, 
Throwable t) {
     GetLogLengthReplyProto.Builder builder = 
GetLogLengthReplyProto.newBuilder();
     if (t != null) {
@@ -245,6 +253,19 @@ public class LogServiceProtoUtil {
     return builder.build();
   }
 
+  public static ArchiveLogRequestProto toExportInfoProto(ArchivalInfo info) {
+    return ArchiveLogRequestProto.newBuilder().setIsExport(true)
+        .setLastArchivedRaftIndex(info.getLastArchivedIndex())
+        .setLocation(info.getArchiveLocation()).setLogName(
+            
LogServiceProtos.LogNameProto.newBuilder().setName(info.getArchiveLogName().getName())
+                
.build()).setStatus(ArchivalStatus.valueOf(info.getStatus().name())).build();
+  }
+
+  public static ArchivalInfo toExportInfo(ArchiveLogRequestProto proto){
+    return new ArchivalInfo(proto.getLocation()).updateArchivalInfo(proto);
+
+  }
+
   public GetLogLengthReplyProto toGetLogLengthReplyProto(long length) {
     GetLogLengthReplyProto.Builder builder = 
GetLogLengthReplyProto.newBuilder();
     builder.setLength(length);
@@ -265,4 +286,26 @@ public class LogServiceProtoUtil {
     return builder.build();
   }
 
+  public static LogServiceRequestProto toExportInfoRequestProto(LogName 
logName){
+    LogServiceProtos.LogNameProto logNameProto =
+        
LogServiceProtos.LogNameProto.newBuilder().setName(logName.getName()).build();
+    GetExportInfoRequestProto exportInfoRequestProto =
+        
GetExportInfoRequestProto.newBuilder().setLogName(logNameProto).build();
+    return 
LogServiceRequestProto.newBuilder().setExportInfo(exportInfoRequestProto).build();
+  }
+
+  public static LogServiceRequestProto toArchiveLogRequestProto(LogName 
logName, String location,
+      long raftIndex, boolean isArchival, ArchivalInfo.ArchivalStatus status) {
+    LogServiceProtos.LogNameProto logNameProto =
+        
LogServiceProtos.LogNameProto.newBuilder().setName(logName.getName()).build();
+    ArchiveLogRequestProto.Builder builder =
+        ArchiveLogRequestProto.newBuilder().setLogName(logNameProto)
+            
.setLastArchivedRaftIndex(raftIndex).setStatus(ArchivalStatus.valueOf(status.name()));
+    builder.setIsExport(!isArchival);
+    if (location != null) {
+      builder.setLocation(location);
+    }
+    ArchiveLogRequestProto archiveLog = builder.build();
+    return 
LogServiceRequestProto.newBuilder().setArchiveLog(archiveLog).build();
+  }
 }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
index c44853f..f5c6d01 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.ratis.logservice.util;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.ratis.logservice.api.LogName;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 
@@ -50,4 +52,22 @@ public class LogServiceUtils {
         }
 
     }
+
+    public static String getArchiveLocationForLog(String location, LogName 
logName) {
+        return location + "/" + logName.getName();
+    }
+
+    public static String getRolledPathForArchiveWriter(Path path, long 
lastWrittenId) {
+        return path + "_recordId_" + lastWrittenId;
+    }
+
+    public static Integer getRecordIdFromRolledArchiveFile(Path path) {
+        String[] splits = path.getName().toString().split("_recordId_");
+        if (splits.length != 2) {
+            //currently written file, should be read last
+            return Integer.MAX_VALUE;
+        }
+        return Integer.parseInt(splits[1]);
+    }
+
 }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
index 82f530e..90227b1 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
@@ -101,15 +101,6 @@ public class MetaServiceProtoUtil {
         return MetaServiceRequestProto.newBuilder().setGetLog(getLog).build();
     }
 
-    public static MetaServiceRequestProto toArchiveLogRequestProto(LogName 
logName) {
-        LogServiceProtos.LogNameProto logNameProto = 
LogServiceProtos.LogNameProto.newBuilder()
-                .setName(logName.getName())
-                .build();
-        ArchiveLogRequestProto archiveLog =
-                
ArchiveLogRequestProto.newBuilder().setLogName(logNameProto).build();
-        return 
MetaServiceRequestProto.newBuilder().setArchiveLog(archiveLog).build();
-    }
-
     public static MetaServiceRequestProto toDeleteLogRequestProto(LogName 
logName) {
         LogServiceProtos.LogNameProto logNameProto = 
LogServiceProtos.LogNameProto.newBuilder()
                 .setName(logName.getName())
diff --git a/ratis-logservice/src/main/proto/LogService.proto 
b/ratis-logservice/src/main/proto/LogService.proto
index 2dc4a32..5c06f95 100644
--- a/ratis-logservice/src/main/proto/LogService.proto
+++ b/ratis-logservice/src/main/proto/LogService.proto
@@ -35,17 +35,32 @@ message LogStreamProto {
 enum LogStreamState {
   OPEN = 0;
   CLOSED = 1;
+  ARCHIVING = 2;
+  ARCHIVED = 3;
+  DELETED = 4;
 }
 
-message CloseLogRequestProto {
+enum ArchivalStatus {
+       SUBMITTED = 0;
+       STARTED = 1;
+       RUNNING = 2;
+       INTERRUPTED = 3;
+       COMPLETED = 4;
+       FAILED = 5;
+}
+
+message ChangeStateLogRequestProto {
   LogNameProto logName = 1;
+  LogStreamState state = 2;
+  LogServiceException exception = 3;
+  bool force = 4;
 }
 
 message GetStateRequestProto {
   LogNameProto logName = 1;
 }
 
-message CloseLogReplyProto {
+message ChangeStateReplyProto {
 }
 
 message GetStateReplyProto {
@@ -143,9 +158,30 @@ message GetLogLastCommittedIndexReplyProto {
        LogServiceException exception = 2;
 }
 
+message ArchiveLogRequestProto {
+       LogNameProto logName = 1;
+       string location = 2;
+       uint64 lastArchivedRaftIndex = 3;
+       bool isExport = 4;
+       ArchivalStatus status = 5;
+}
+
+message ArchiveLogReplyProto {
+       LogServiceException exception = 1;
+}
+
+message GetExportInfoRequestProto {
+       LogNameProto logName = 1;
+}
+
+message GetExportInfoReplyProto {
+       repeated ArchiveLogRequestProto info = 1;
+       LogServiceException exception = 2;
+}
+
 message LogServiceRequestProto {
   oneof Request {
-    CloseLogRequestProto closeLog = 1;
+    ChangeStateLogRequestProto changeState = 1;
     GetStateRequestProto getState = 2;
     ReadLogRequestProto readNextQuery = 3;
     GetLogLengthRequestProto lengthQuery = 4;
@@ -154,6 +190,8 @@ message LogServiceRequestProto {
        SyncLogRequestProto          syncRequest = 7;
        GetLogLastCommittedIndexRequestProto lastIndexQuery = 8;
        GetLogSizeRequestProto sizeRequest = 9;
+       ArchiveLogRequestProto archiveLog = 10;
+       GetExportInfoRequestProto exportInfo= 11;
   }
 }
 
diff --git a/ratis-logservice/src/main/proto/MetaService.proto 
b/ratis-logservice/src/main/proto/MetaService.proto
index 7c94d1b..16e0232 100644
--- a/ratis-logservice/src/main/proto/MetaService.proto
+++ b/ratis-logservice/src/main/proto/MetaService.proto
@@ -54,10 +54,6 @@ message GetLogRequestProto {
   LogNameProto logName = 1;
 }
 
-message ArchiveLogRequestProto {
-  LogNameProto logName = 1;
-}
-
 message DeleteLogRequestProto {
   LogNameProto logName = 1;
 }
@@ -78,10 +74,6 @@ message ListLogsReplyProto {
 
 }
 
-message ArchiveLogReplyProto {
-  MetaServiceExceptionProto exception = 1;
-}
-
 message DeleteLogReplyProto {
   MetaServiceExceptionProto exception = 1;
 }
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
index 925ebac..090e1eb 100644
--- 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
@@ -84,7 +84,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends 
MiniRaftCluster>
       return super.getStartRecordId();
     }
 
-    @Override public State getState() {
+    @Override public State getState() throws IOException {
       getStateCount++;
       return super.getState();
     }
@@ -110,7 +110,8 @@ public abstract class LogServiceReadWriteBase<CLUSTER 
extends MiniRaftCluster>
   }
   @Before
   public void setUpCluster() throws IOException, InterruptedException {
-    cluster = newCluster(NUM_PEERS);
+    RaftProperties raftProperties = getProperties();
+    cluster = getFactory().newCluster(NUM_PEERS, raftProperties);
     cluster.start();
     RaftTestUtil.waitForLeader(cluster);
   }
@@ -166,6 +167,8 @@ public abstract class LogServiceReadWriteBase<CLUSTER 
extends MiniRaftCluster>
         assertEquals(expected, actual);
       }
       testJMXMetrics(logStream);
+      assertEquals(logStream.getState(),State.OPEN);
+
     }
   }
 
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/impl/TestArchiveHdfsLogReaderAndWriter.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/impl/TestArchiveHdfsLogReaderAndWriter.java
new file mode 100644
index 0000000..4213ac8
--- /dev/null
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/impl/TestArchiveHdfsLogReaderAndWriter.java
@@ -0,0 +1,153 @@
+package org.apache.ratis.logservice.impl;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import com.google.common.primitives.Bytes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.ratis.logservice.api.ArchiveLogReader;
+import org.apache.ratis.logservice.api.ArchiveLogWriter;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.util.LogServiceUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestArchiveHdfsLogReaderAndWriter {
+  static MiniDFSCluster cluster;
+  static Configuration conf;
+  private static String location;
+
+  @BeforeClass public static void setup() throws IOException {
+    conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    location = "target/tmp/archive/TestArchiveHdfsLogReaderAndWriter";
+  }
+
+  @Test public void testRollingWriter() throws IOException {
+    String archiveLocation = location+"/testRollingWriter";
+    LogName logName = LogName.of("testRollingWriterLogName");
+    DistributedFileSystem fs = cluster.getFileSystem();
+    fs.delete(new Path(archiveLocation), true);
+    ArchiveLogWriter writer = new ArchiveHdfsLogWriter(conf);
+    writer.init(archiveLocation, logName);
+    int k = 2;
+    write(writer, 1, k);
+    Assert.assertEquals(writer.getLastWrittenRecordId(), k);
+    writer.rollWriter();
+    String[] files = Arrays.stream(
+        fs.listStatus(new 
Path(LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName))))
+        .map(fileStatus -> 
fileStatus.getPath().getName()).toArray(String[]::new);
+    String[] expectedFiles = { logName.getName(), logName.getName() + 
"_recordId_" + k };
+    Assert.assertArrayEquals(expectedFiles, files);
+    ArchiveLogReader reader = new ArchiveHdfsLogReader(conf,
+        LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName));
+    verifyRecords(reader, k);
+    Assert.assertEquals(writer.getLastWrittenRecordId(), reader.getPosition());
+    write(writer, k + 1, 2 * k);
+    Assert.assertEquals(writer.getLastWrittenRecordId(), 2 * k);
+    writer.close();
+    reader = new ArchiveHdfsLogReader(conf,
+        LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName));
+    verifyRecords(reader, 2 * k);
+
+    files = ((ArchiveHdfsLogReader) reader).getFiles().stream()
+        .map(fileStatus -> 
fileStatus.getPath().getName()).toArray(String[]::new);
+    String[] expectedFiles1 =
+        { logName.getName() + "_recordId_" + k, logName.getName() + 
"_recordId_" + 2 * k };
+    Assert.assertArrayEquals(expectedFiles1, files);
+    reader.close();
+  }
+
+  private void verifyRecords(ArchiveLogReader reader, int n) throws 
IOException {
+    for (int i = 1; i <= n; i++) {
+      Assert.assertEquals(i, ByteBuffer.wrap(reader.next()).getInt());
+    }
+    Assert.assertFalse(reader.hasNext());
+    Assert.assertTrue(reader.next() == null);
+    try {
+      reader.readNext();
+      Assert.fail();
+    } catch (NoSuchElementException e) {
+      //expected
+    }
+  }
+
+  private void write(ArchiveLogWriter writer, int start, int end) throws 
IOException {
+    for (Integer i = start; i <= end; i++) {
+      writer.write(ByteBuffer.allocate(4).putInt(i));
+    }
+  }
+
+  @Test public void testCorruptedFileEOF() throws IOException {
+    FSDataOutputStream fos = FileSystem.get(conf).create(new 
Path(location,"testEOF"));
+    fos.write(ByteBuffer.allocate(4).putInt(4).array());
+    fos.write(new byte[4]);
+    fos.write(ByteBuffer.allocate(4).putInt(4).array());
+    // but data is just 2 bytes
+    fos.write(new byte[2]);
+    fos.close();
+    ArchiveLogReader reader = new ArchiveHdfsLogReader(conf, 
location+"/testEOF");
+    try {
+      reader.next();
+      Assert.fail();
+    } catch (EOFException e) {
+      //expected
+    }
+
+  }
+
+  @Test public void testSeek() throws IOException {
+    String archiveLocation = location+"/testSeek";
+    LogName logName = LogName.of("testSeek");
+    DistributedFileSystem fs = cluster.getFileSystem();
+    fs.delete(new Path(archiveLocation), true);
+    ArchiveLogWriter writer = new ArchiveHdfsLogWriter(conf);
+    writer.init(archiveLocation, logName);
+    int k = 100;
+    write(writer, 1, k);
+    writer.close();
+    ArchiveLogReader reader = new ArchiveHdfsLogReader(conf,
+        LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName));
+    reader.seek(80);
+    Assert.assertEquals(80, reader.getPosition());
+    int count = 0;
+    while (reader.next() != null) {
+      count++;
+    }
+    Assert.assertEquals(20, count);
+  }
+
+  @AfterClass
+  public static void teardownafterclass(){
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    deleteLocalDirectory(new File(location));
+  }
+
+  static boolean deleteLocalDirectory(File dir) {
+    File[] allFiles = dir.listFiles();
+    if (allFiles != null) {
+      for (File file : allFiles) {
+        deleteLocalDirectory(file);
+      }
+    }
+    return dir.delete();
+  }
+
+}
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
index 685cf9b..886f058 100644
--- 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
@@ -19,13 +19,14 @@
 package org.apache.ratis.logservice.server;
 
 import org.apache.ratis.logservice.api.*;
+import org.apache.ratis.logservice.api.LogStream.State;
 import org.apache.ratis.logservice.client.LogServiceClient;
-import org.apache.ratis.logservice.common.Constants;
 import org.apache.ratis.logservice.common.LogAlreadyExistException;
 import org.apache.ratis.logservice.common.LogNotFoundException;
 import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
 import org.apache.ratis.logservice.proto.MetaServiceProtos;
 import org.apache.ratis.logservice.util.LogServiceCluster;
+import org.apache.ratis.logservice.util.TestUtils;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.junit.AfterClass;
@@ -33,11 +34,11 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
@@ -70,6 +71,7 @@ public class TestMetaServer {
             listCount.incrementAndGet();
             return super.listLogs();
         }
+
     };
     @BeforeClass
     public static void beforeClass() {
@@ -92,12 +94,10 @@ public class TestMetaServer {
      */
     @Test
     public void testCreateAndGetLog() throws Exception {
-
         // This should be LogServiceStream ?
         LogStream logStream1 = client.createLog(LogName.of("testCreateLog"));
         assertNotNull(logStream1);
         LogStream logStream2 = client.getLog(LogName.of("testCreateLog"));
-        
testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.GETLOG.name(),1l);
         assertNotNull(logStream2);
     }
 
@@ -126,6 +126,98 @@ public class TestMetaServer {
         assert(res.array().length > 0);
     }
 
+    @Test
+    public void testLogArchival() throws IOException, InterruptedException {
+        LogName logName = LogName.of("testArchivalLog");
+        LogStream logStream = client.createLog(logName);
+        LogWriter writer = logStream.createWriter();
+        List<LogInfo> listLogs = client.listLogs();
+        assert (listLogs.stream()
+            .filter(log -> 
log.getLogName().getName().startsWith(logName.getName())).count() == 1);
+        List<LogServer> workers = cluster.getWorkers();
+        List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
+        writer.write(records);
+        client.closeLog(logName);
+        assertEquals(logStream.getState(), State.CLOSED);
+        client.archiveLog(logName);
+        int retry = 0;
+        while (logStream.getState() != State.ARCHIVED && retry <= 40) {
+            Thread.sleep(1000);
+            retry++;
+        }
+        assertEquals(logStream.getState(), State.ARCHIVED);
+        LogReader reader = logStream.createReader();
+        List<ByteBuffer> data = reader.readBulk(records.size());
+        assertEquals(records.size(), data.size());
+        reader.seek(1);
+        data = reader.readBulk(records.size());
+        assertEquals(records.size() - 1, data.size());
+
+        //Test ArchiveLogStream
+        LogServiceConfiguration config = LogServiceConfiguration.create();
+        LogStream archiveLogStream = client.getArchivedLog(logName);
+        reader = archiveLogStream.createReader();
+        data = reader.readBulk(records.size());
+        assertEquals(records.size(), data.size());
+    }
+
+    @Test
+    public void testLogExport() throws IOException, InterruptedException {
+        LogName logName = LogName.of("testLogExport");
+        LogStream logStream = client.createLog(logName);
+        LogWriter writer = logStream.createWriter();
+        List<LogInfo> listLogs = client.listLogs();
+        assert (listLogs.stream()
+            .filter(log -> 
log.getLogName().getName().startsWith(logName.getName())).count() == 1);
+        List<LogServer> workers = cluster.getWorkers();
+        List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
+        writer.write(records);
+        String location1 = "target/tmp/export_1/";
+        String location2 = "target/tmp/export_2/";
+        deleteLocalDirectory(new File(location1));
+        deleteLocalDirectory(new File(location2));
+        int startPosition1 = 3;
+        int startPosition2 = 5;
+        client.exportLog(logName, location1, startPosition1);
+        client.exportLog(logName, location2, startPosition2);
+        List<ArchivalInfo> infos=client.getExportStatus(logName);
+        int count=0;
+        while (infos.size() > 1 && (
+            infos.get(0).getStatus() != ArchivalInfo.ArchivalStatus.COMPLETED
+                || infos.get(1).getStatus() != 
ArchivalInfo.ArchivalStatus.COMPLETED)
+            && count < 10) {
+            infos = client.getExportStatus(logName);
+            ;
+            Thread.sleep(1000);
+            count++;
+
+        }
+
+        //Test ExportLogStream
+        LogStream exportLogStream = client.getExportLog(logName, location1);
+        LogReader reader = exportLogStream.createReader();
+        List<ByteBuffer> data = reader.readBulk(records.size());
+        assertEquals(records.size() - startPosition1, data.size());
+        reader.close();
+        exportLogStream = client.getExportLog(logName, location2);
+        reader = exportLogStream.createReader();
+        data = reader.readBulk(records.size());
+        assertEquals(records.size() - startPosition2, data.size());
+        reader.close();
+        writer.close();
+    }
+
+    boolean deleteLocalDirectory(File dir) {
+        File[] allFiles = dir.listFiles();
+        if (allFiles != null) {
+            for (File file : allFiles) {
+                deleteLocalDirectory(file);
+            }
+        }
+        return dir.delete();
+    }
+
+
     /**
      * Test for Delete operation
      * @throws IOException
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/TestLogServiceProtoUtil.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/TestLogServiceProtoUtil.java
index 4f0c658..3187868 100644
--- 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/TestLogServiceProtoUtil.java
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/TestLogServiceProtoUtil.java
@@ -165,7 +165,7 @@ public class TestLogServiceProtoUtil {
   @Ignore
   public void testGetStateReply() {
     LogStream logStream = null;
-    GetStateReplyProto proto = LogServiceProtoUtil.toGetStateReplyProto(true);
+    GetStateReplyProto proto = 
LogServiceProtoUtil.toGetStateReplyProto(LogStream.State.OPEN);
     //TODO finish
 
   }
@@ -175,8 +175,9 @@ public class TestLogServiceProtoUtil {
   @Test
   public void testCloseLogRequest() {
     LogName name = LogName.of("test");
-    LogServiceRequestProto proto = 
LogServiceProtoUtil.toCloseLogRequestProto(name);
-    CloseLogRequestProto request = proto.getCloseLog();
+    LogServiceRequestProto proto = 
LogServiceProtoUtil.toChangeStateRequestProto(name,
+        LogStream.State.CLOSED);
+    ChangeStateLogRequestProto request = proto.getChangeState();
     assertEquals(name.getName(), request.getLogName().getName());
     //TODO finish
   }
diff --git a/ratis-logservice/src/test/resources/logservice.xml 
b/ratis-logservice/src/test/resources/logservice.xml
new file mode 100644
index 0000000..a509774
--- /dev/null
+++ b/ratis-logservice/src/test/resources/logservice.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>logservice.archival.location</name>
+    <value>target/tmp/archive_1/</value>
+    <description>Base directory where logs are archived</description>
+  </property>
+</configuration>
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 705876e..bc8961e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -45,8 +45,8 @@ import java.util.concurrent.atomic.AtomicReference;
  * Base implementation for StateMachines.
  */
 public class BaseStateMachine implements StateMachine {
-  private final CompletableFuture<RaftServer> server = new 
CompletableFuture<>();
-  private volatile RaftGroupId groupId;
+  protected final CompletableFuture<RaftServer> server = new 
CompletableFuture<>();
+  protected volatile RaftGroupId groupId;
   protected final LifeCycle lifeCycle = new 
LifeCycle(getClass().getSimpleName());
 
   private final AtomicReference<TermIndex> lastAppliedTermIndex = new 
AtomicReference<>();

Reply via email to