RATIS-369: LogService read/write path (Phase 2)

Signed-off-by: Josh Elser <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/b901b3a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/b901b3a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/b901b3a5

Branch: refs/heads/master
Commit: b901b3a5a95a0128569d3a660fd0af24a7ee83e4
Parents: e308cf5
Author: Vladimir Rodionov <[email protected]>
Authored: Fri Oct 26 11:35:45 2018 -0700
Committer: Josh Elser <[email protected]>
Committed: Fri Oct 26 15:40:57 2018 -0400

----------------------------------------------------------------------
 .../ratis/logservice/LogServiceFactory.java     |  48 ----
 .../apache/ratis/logservice/api/LogMessage.java |  13 --
 .../apache/ratis/logservice/api/LogService.java | 156 -------------
 .../ratis/logservice/api/LogStateMachine.java   | 222 ++++++++-----------
 .../apache/ratis/logservice/api/LogStream.java  |  23 +-
 .../logservice/client/LogServiceClient.java     |  57 ++++-
 .../ratis/logservice/dummy/DummyLogReader.java  |  76 -------
 .../ratis/logservice/dummy/DummyLogService.java | 106 ---------
 .../ratis/logservice/dummy/DummyLogStream.java  | 108 ---------
 .../ratis/logservice/dummy/DummyLogWriter.java  |  51 -----
 .../ratis/logservice/impl/LogReaderImpl.java    | 139 +++++++-----
 .../ratis/logservice/impl/LogServiceImpl.java   | 158 -------------
 .../ratis/logservice/impl/LogStreamImpl.java    |  85 +++++--
 .../ratis/logservice/impl/LogWriterImpl.java    |  73 +++---
 .../logservice/server/MetaStateMachine.java     |  49 ++--
 .../logservice/util/LogServiceProtoUtil.java    |  48 ++--
 .../src/main/proto/LogService.proto             |  14 +-
 .../ratis/logservice/LogServiceBaseTest.java    |  45 ++--
 .../logservice/LogServiceReadWriteBase.java     | 131 +++++++++++
 .../logservice/TestLogServiceWithGrpc.java      |   4 +
 .../logservice/TestLogServiceWithNetty.java     |   2 +
 .../ratis/logservice/api/TestApiExample.java    |  67 ------
 .../ratis/logservice/server/TestMetaServer.java |  29 +--
 .../logservice/util/LogServiceCluster.java      |   6 +-
 .../util/TestLogServiceProtoUtil.java           |   2 +-
 25 files changed, 592 insertions(+), 1120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
deleted file mode 100644
index 1b9966a..0000000
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.logservice;
-
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.logservice.api.LogService;
-import org.apache.ratis.logservice.api.LogServiceConfiguration;
-import org.apache.ratis.logservice.impl.LogServiceImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LogServiceFactory {
-  public static final Logger LOG = 
LoggerFactory.getLogger(LogServiceFactory.class);
-  private static final LogServiceFactory INSTANCE = new LogServiceFactory();
-
-  private LogServiceFactory() {}
-
-  /**
-   * Creates an implementation of {@link LogService} using the given {@link 
RaftClient}.
-   *
-   * @param raftClient The client to a Raft quorum.
-   */
-  public LogService createLogService(RaftClient raftClient, 
LogServiceConfiguration config) {
-    return new LogServiceImpl(raftClient, config);
-  }
-
-  /**
-   * Returns an instance of the factory to create {@link LogService} instances.
-   */
-  public static LogServiceFactory getInstance() {
-    return INSTANCE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java
index 3d3bb56..82ca592 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java
@@ -22,13 +22,6 @@ import org.apache.ratis.protocol.Message;
 public abstract class LogMessage implements Message {
 
   /*
-   * Type of messages
-   */
-  public static enum Type {
-    APPEND, CLOSE, SYNC, GET_START_INDEX, READ, GET_LENGTH
-  }
-
-  /*
    * Log name
    */
   protected LogName logName;
@@ -41,10 +34,4 @@ public abstract class LogMessage implements Message {
     return logName;
   }
 
-  /**
-   * Get message type
-   * @return message type
-   */
-  public abstract Type getType();
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
deleted file mode 100644
index de47c62..0000000
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.logservice.api;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.logservice.api.LogStream.State;
-
-/**
- * Entry point for interacting with the Ratis LogService.
- */
-public interface LogService extends AutoCloseable {
-  /*
-   * How to create a LogStream
-   */
-
-  /**
-   * Creates a new {@link LogStream} identified by the given name with default
-   * configuration. Throws an exception if a {@link LogStream} with the given
-   * name already exists.
-   *
-   * @param name Unique name for this LogStream.
-   */
-  LogStream createLog(LogName name) throws IOException;
-
-  /**
-   * Creates a new {@link LogStream} identified by the given name. Throws
-   * an exception if a {@link LogStream} with the given name already exists.
-   *
-   * @param name Unique name for this LogStream.
-   * @param config Configuration object for this LogStream
-   */
-  LogStream createLog(LogName name, LogServiceConfiguration config) throws 
IOException;
-
-  /*
-   * How to get LogStreams that already exist
-   */
-  /**
-   * Fetches the {@link LogStream} identified by the given name.
-   *
-   * @param name The name of the LogStream
-   */
-  LogStream getLog(LogName name) throws IOException;
-
-  /**
-   * Lists all {@link LogStream} instances known by this LogService.
-   */
-  Iterator<LogStream> listLogs() throws IOException;
-
-  /*
-   * How to close, archive, and delete LogStreams
-   */
-
-  /**
-   * Moves the {@link LogStream} identified by the {@code name} from {@link 
State.OPEN} to {@link State.CLOSED}.
-   * If the log is not {@link State#OPEN}, this method returns an error.
-   *
-   * @param name The name of the log to close
-   */
-  // TODO this name sucks, confusion WRT the Java Closeable interface.
-  void closeLog(LogName name) throws IOException;
-
-  /**
-   * Returns the current {@link State} of the log identified by {@code name}.
-   *
-   * @param name The name of a log
-   */
-  State getState(LogName name) throws IOException;
-
-  /**
-   * Archives the given log out of the state machine and into a configurable 
long-term storage. A log must be
-   * in {@link State#CLOSED} to archive it.
-   *
-   * @param name The name of the log to archive.
-   */
-  void archiveLog(LogName name) throws IOException;
-
-  /**
-   * Deletes the {@link LogStream}.
-   * @param name The name of the LogStream
-   */
-  void deleteLog(LogName name) throws IOException;
-
-  /*
-   * Change the configuration of a LogStream or manipulate a LogStream's 
listeners
-   */
-
-  /**
-   * Updates a log with the new configuration object, overriding
-   * the previous configuration.
-   *
-   * @param config The new configuration object
-   */
-  void updateConfiguration(LogName name, LogServiceConfiguration config);
-
-  /**
-   * Registers a {@link RecordListener} with the log which will receive all 
records written using
-   * the unique name provided by {@link RecorderListener#getName()}.
-   *
-   * Impl spec: The name returned by a {@link RecordListener} instance 
uniquely identifies it against other
-   * instances.
-   *
-   * @param the log's name
-   * @param listener The listener to register
-   */
-  void addRecordListener(LogName name, RecordListener listener);
-
-  /**
-   * Removes a {@link RecordListener) for the log.
-   *
-   * Impl spec: The name returned by a {@link RecordListener} instance 
uniquely identifies it against
-   * other instances.
-   *
-   * @param the log's name
-   * @param listener The listener to remove
-   * @return
-   */
-  boolean removeRecordListener(LogName name, RecordListener listener);
-
-  /**
-   * Overrides {@link #close()} in {@link AutoCloseable} to throw an 
IOException.
-   */
-  @Override
-  void close() throws IOException;
-
-  /**
-   * Gets Raft client object
-   * @return raft client object
-   */
-  RaftClient getRaftClient();
-
-  /**
-   * Gets configuration
-   * @return configuration
-   */
-  LogServiceConfiguration getConfiguration();
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
index 1617c94..c72e1fc 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
@@ -26,15 +26,11 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.ratis.logservice.impl.LogStreamImpl;
+import org.apache.ratis.logservice.api.LogName;
 import org.apache.ratis.logservice.proto.LogServiceProtos.*;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
 import org.apache.ratis.proto.RaftProtos;
@@ -60,10 +56,18 @@ import org.slf4j.LoggerFactory;
 
 public class LogStateMachine extends BaseStateMachine {
   public static final Logger LOG = 
LoggerFactory.getLogger(LogStateMachine.class);
+
+  public static enum State {
+    OPEN, CLOSED
+  }
+
   /*
-   *  State
+   *  State is a pair log's length and state (closed/open);
    */
-  private final Map<LogName, Long> state = new ConcurrentHashMap<>();
+
+  private long length;
+
+  private State state = State.OPEN;
 
   private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
 
@@ -87,7 +91,7 @@ public class LogStateMachine extends BaseStateMachine {
    * Reset state machine
    */
   void reset() {
-    state.clear();
+    this.length = 0;
     setLastAppliedTermIndex(null);
   }
 
@@ -116,19 +120,19 @@ public class LogStateMachine extends BaseStateMachine {
 
   @Override
   public long takeSnapshot() {
-    final Map<LogName, Long> copy;
     final TermIndex last;
     try(final AutoCloseableLock readLock = readLock()) {
-      copy = new HashMap<>(state);
       last = getLastAppliedTermIndex();
     }
 
     final File snapshotFile =  storage.getSnapshotFile(last.getTerm(), 
last.getIndex());
     LOG.info("Taking a snapshot to file {}", snapshotFile);
 
-    try(final ObjectOutputStream out = new ObjectOutputStream(
+    try(final AutoCloseableLock readLock = readLock();
+        final ObjectOutputStream out = new ObjectOutputStream(
         new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
-      out.writeObject(copy);
+      out.writeLong(length);
+      out.writeObject(state);
     } catch(IOException ioe) {
       LOG.warn("Failed to write snapshot file \"" + snapshotFile
           + "\", last applied index=" + last);
@@ -141,7 +145,6 @@ public class LogStateMachine extends BaseStateMachine {
     return load(snapshot, false);
   }
 
-  @SuppressWarnings("unchecked")
   private long load(SingleFileSnapshotInfo snapshot, boolean reload) throws 
IOException {
     if (snapshot == null) {
       LOG.warn("The snapshot info is null.");
@@ -161,7 +164,8 @@ public class LogStateMachine extends BaseStateMachine {
         reset();
       }
       setLastAppliedTermIndex(last);
-      state.putAll((Map<LogName, Long>) in.readObject());
+      this.length = in.readLong();
+      this.state = (State) in.readObject();
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException(e);
     }
@@ -192,6 +196,8 @@ public class LogStateMachine extends BaseStateMachine {
           return processGetStartIndexRequest(logServiceRequestProto);
         case GETSTATE:
           return processGetStateRequest(logServiceRequestProto);
+        case LASTINDEXQUERY:
+          return processGetLastCommittedIndexRequest(logServiceRequestProto);
         default:
           // TODO
           throw new RuntimeException(
@@ -213,9 +219,26 @@ public class LogStateMachine extends BaseStateMachine {
   private CompletableFuture<Message>
       processGetStartIndexRequest(LogServiceRequestProto proto)
   {
-    long startIndex =log.getStartIndex();
+
+    Throwable t = verifyState(State.OPEN);
+    long startIndex = log.getStartIndex();
     return CompletableFuture.completedFuture(Message
-      .valueOf(LogServiceProtoUtil.toGetLogStartIndexReplyProto(startIndex, 
null).toByteString()));
+      .valueOf(LogServiceProtoUtil.toGetLogStartIndexReplyProto(startIndex, 
t).toByteString()));
+  }
+
+  /**
+   * Process get last committed record index
+   * @param msg message
+   * @return reply message
+   */
+  private CompletableFuture<Message>
+      processGetLastCommittedIndexRequest(LogServiceRequestProto proto)
+  {
+
+    Throwable t = verifyState(State.OPEN);
+    long lastIndex = log.getLastCommittedIndex();
+    return CompletableFuture.completedFuture(Message
+      .valueOf(LogServiceProtoUtil.toGetLogLastIndexReplyProto(lastIndex, 
t).toByteString()));
   }
 
   /**
@@ -224,18 +247,11 @@ public class LogStateMachine extends BaseStateMachine {
    * @return reply message
    */
   private CompletableFuture<Message> 
processGetLengthRequest(LogServiceRequestProto proto) {
-    Long len = null;
     GetLogLengthRequestProto msgProto = proto.getLengthQuery();
-    LogName logName = LogName.of(msgProto.getLogName().getName());
-    try(final AutoCloseableLock readLock = readLock()) {
-      len = state.get(logName);
-      if (len == null) {
-        len = new Long(-1);
-      }
-    }
-    LOG.debug("QUERY: {}, RESULT: {}", msgProto, len);
+    Throwable t = verifyState(State.OPEN);
+    LOG.debug("QUERY: {}, RESULT: {}", msgProto, this.length);
     return CompletableFuture.completedFuture(Message
-      .valueOf(LogServiceProtoUtil.toGetLogLengthReplyProto(len, 
null).toByteString()));
+      .valueOf(LogServiceProtoUtil.toGetLogLengthReplyProto(this.length, 
t).toByteString()));
   }
 
   /**
@@ -248,19 +264,30 @@ public class LogStateMachine extends BaseStateMachine {
     ReadLogRequestProto msgProto = proto.getReadNextQuery();
     long startRecordId = msgProto.getStartRecordId();
     int num = msgProto.getNumRecords();
-    Throwable t = null;
+    Throwable t = verifyState(State.OPEN);
     List<byte[]> list = new ArrayList<byte[]>();
-    for (long index = startRecordId; index < startRecordId + num; index++) {
-      try {
-        
list.add(log.getEntryWithData(index).getEntry().getStateMachineLogEntry().getLogData().toByteArray());
-      } catch(RaftLogIOException e) {
-        t = e;
-        list = null;
-        break;
+    LOG.info("Start Index: {}", startRecordId);
+    LOG.info("Total to read: {}", num);
+    if (t == null) {
+      for (long index = startRecordId; index < startRecordId + num; index++) {
+        try {
+          LogEntryProto entry = log.get(index);
+          LOG.info("Index: {} Entry: {}", index, entry);
+          if (entry == null || entry.hasConfigurationEntry()) {
+            continue;
+          }
+          //TODO: how to distinguish log records from
+          // DML commands logged by the service?
+          list.add(entry.getStateMachineLogEntry().getLogData().toByteArray());
+        } catch (RaftLogIOException e) {
+          t = e;
+          list = null;
+          break;
+        }
       }
     }
-    return CompletableFuture.completedFuture(Message
-      .valueOf(LogServiceProtoUtil.toReadLogReplyProto(list, 
t).toByteString()));
+    return CompletableFuture.completedFuture(
+      Message.valueOf(LogServiceProtoUtil.toReadLogReplyProto(list, 
t).toByteString()));
   }
 
   /**
@@ -271,10 +298,10 @@ public class LogStateMachine extends BaseStateMachine {
    */
   private CompletableFuture<Message> processSyncRequest(TransactionContext trx,
       LogServiceRequestProto logMessage) {
-
-    // TODO
+     long index = trx.getLogEntry().getIndex();
+    // TODO: Do we really need this call?
     return CompletableFuture.completedFuture(Message
-      .valueOf(LogServiceProtoUtil.toSyncLogReplyProto(null).toByteString()));
+      .valueOf(LogServiceProtoUtil.toSyncLogReplyProto(index, 
null).toByteString()));
 
   }
 
@@ -284,31 +311,28 @@ public class LogStateMachine extends BaseStateMachine {
     final LogEntryProto entry = trx.getLogEntry();
     AppendLogEntryRequestProto proto = logProto.getAppendRequest();
     final long index = entry.getIndex();
-    Long val = null;
-    LogName name = null;
     long total = 0;
-    try (final AutoCloseableLock writeLock = writeLock()) {
-      name = LogServiceProtoUtil.toLogName(proto.getLogName());
-      List<byte[]> entries = 
LogServiceProtoUtil.toListByteArray(proto.getDataList());
-      for (byte[] bb : entries) {
-        total += bb.length;
+    Throwable t = verifyState(State.OPEN);
+    if (t == null) {
+      try (final AutoCloseableLock writeLock = writeLock()) {
+          List<byte[]> entries = 
LogServiceProtoUtil.toListByteArray(proto.getDataList());
+          for (byte[] bb : entries) {
+            total += bb.length;
+          }
+          this.length += total;
+          // TODO do we need this for other write request (close, sync)
+          updateLastAppliedTermIndex(entry.getTerm(), index);
       }
-      val = state.get(name);
-      if (val == null) {
-        val = new Long(0);
-      }
-      state.put(name, val + total);
-      // TODO do we need this for other write request (close, sync)
-      updateLastAppliedTermIndex(entry.getTerm(), index);
     }
+    List<Long> ids = new ArrayList<Long>();
+    ids.add(index);
     final CompletableFuture<Message> f =
-        // TODO record ids?
-        CompletableFuture.completedFuture(Message
-          .valueOf(LogServiceProtoUtil.toAppendLogReplyProto(null, 
null).toByteString()));
+        CompletableFuture.completedFuture(
+          Message.valueOf(LogServiceProtoUtil.toAppendLogReplyProto(ids, 
t).toByteString()));
     final RaftProtos.RaftPeerRole role = trx.getServerRole();
-    LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, proto, val);
+    LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, proto, 
length);
     if (LOG.isTraceEnabled()) {
-      LOG.trace("{}-{}: variables={}", getId(), index, state);
+      LOG.trace("{}-{}: variables={}", getId(), index, length);
     }
     return f;
   }
@@ -326,15 +350,8 @@ public class LogStateMachine extends BaseStateMachine {
       LogServiceRequestProto logServiceRequestProto =
           
LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
       switch (logServiceRequestProto.getRequestCase()) {
-
-//        case CREATELOG:
-//          return processCreateLogRequest(logServiceRequestProto);
-//        case ARCHIVELOG:
-//          return processArchiveLog(logServiceRequestProto);
         case CLOSELOG:
           return processCloseLog(logServiceRequestProto);
-//        case DELETELOG:
-//          return processDeleteLog(logServiceRequestProto);
         case APPENDREQUEST:
           return processAppendRequest(trx, logServiceRequestProto);
         case SYNCREQUEST:
@@ -349,17 +366,7 @@ public class LogStateMachine extends BaseStateMachine {
     }
   }
 
-//  private CompletableFuture<Message>
-//      processDeleteLog(LogServiceRequestProto logServiceRequestProto) {
-//    DeleteLogRequestProto deleteLog = logServiceRequestProto.getDeleteLog();
-//    LogName logName = LogServiceProtoUtil.toLogName(deleteLog.getLogName());
-//    try (final AutoCloseableLock writeLock = writeLock()) {
-//      state.remove(logName);
-//    }
-//    // TODO need to handle exceptions while operating with files.
-//    return CompletableFuture.completedFuture(Message
-//      .valueOf(DeleteLogReplyProto.newBuilder().build().toByteString()));
-//  }
+
 
   private CompletableFuture<Message> processCloseLog(LogServiceRequestProto 
logServiceRequestProto) {
     CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
@@ -370,65 +377,22 @@ public class LogStateMachine extends BaseStateMachine {
       .valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
   }
 
-//  private CompletableFuture<Message>
-//      processArchiveLog(LogServiceRequestProto logServiceRequestProto) {
-//    ArchiveLogRequestProto archiveLog = 
logServiceRequestProto.getArchiveLog();
-//    LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
-//    // Handle log archiving.
-//    return CompletableFuture.completedFuture(Message
-//      .valueOf(ArchiveLogReplyProto.newBuilder().build().toByteString()));
-//  }
+
 
   private CompletableFuture<Message> processGetStateRequest(
       LogServiceRequestProto logServiceRequestProto) {
     GetStateRequestProto getState = logServiceRequestProto.getGetState();
     LogName logName = LogServiceProtoUtil.toLogName(getState.getLogName());
     return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-        .toGetStateReplyProto(state.containsKey(logName)).toByteString()));
+        .toGetStateReplyProto(state == State.OPEN).toByteString()));
   }
 
-//  private CompletableFuture<Message> processCreateLogRequest(
-//      LogServiceRequestProto logServiceRequestProto) {
-//    Long val;
-//    LogName name;
-//    try (final AutoCloseableLock writeLock = writeLock()) {
-//      CreateLogRequestProto createLog = 
logServiceRequestProto.getCreateLog();
-//      name = LogServiceProtoUtil.toLogName(createLog.getLogName());
-//      val = state.get(name);
-//      if (val == null) {
-//        val = new Long(0);
-//      }
-//      state.put(name, val);
-//    }
-//    //TODO This can't be part of a state machine (REMOVE)
-//    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-//        .toCreateLogReplyProto(
-//          new LogStreamImpl(name, null, new 
LogServiceConfiguration())).toByteString()));
-//  }
-
-//  //TODO REMOVE this code
-//  private CompletableFuture<Message> processListLogsRequest() {
-//    List<LogStream> logStreams = new ArrayList<LogStream>(state.size());
-//    for (Entry<LogName, Long> e : state.entrySet()) {
-//      logStreams.add(new LogStreamImpl(e.getKey(), null, new 
LogServiceConfiguration()));
-//    }
-//    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-//        .toListLogLogsReplyProto(logStreams).toByteString()));
-//  }
-
-  //TODO REMOVE this code
-
-//  private CompletableFuture<Message> processGetLogRequest(
-//      LogServiceRequestProto logServiceRequestProto) {
-//    GetLogRequestProto getLog = logServiceRequestProto.getGetLog();
-//    LogName logName = LogServiceProtoUtil.toLogName(getLog.getLogName());
-//    if (state.containsKey(logName)) {
-//      return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-//          .toGetLogReplyProto(new LogStreamImpl(logName, null, new 
LogServiceConfiguration()))
-//          .toByteString()));
-//    } else {
-//      return 
CompletableFuture.completedFuture(Message.valueOf(GetLogReplyProto.newBuilder()
-//          .build().toByteString()));
-//    }
-//  }
+  private Throwable verifyState(State state) {
+       if (this.state != state) {
+          return new IOException("Wrong state: " + this.state);
+        }
+        return null;
+   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
index 015b90c..fbb977f 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
@@ -17,8 +17,10 @@
  */
 package org.apache.ratis.logservice.api;
 
+import java.io.IOException;
 import java.util.Collection;
-import java.util.Set;
+
+import org.apache.ratis.client.RaftClient;
 
 /**
  * A distributed log with "infinite" length that supports reads and writes.
@@ -45,8 +47,9 @@ public interface LogStream extends AutoCloseable{
 
   /**
    * Returns the size of this LogStream in bytes.
+   * @throws IOException
    */
-  long getSize();
+  long getSize() throws IOException;
 
   /**
    * Creates a reader to read this LogStream.
@@ -64,8 +67,15 @@ public interface LogStream extends AutoCloseable{
 
   /**
    * Returns the recordId of the last record in this LogStream. For an empty 
log, the recordId is {@code 0}.
+   * @throws IOException
    */
-  long getLastRecordId();
+  long getLastRecordId() throws IOException;
+
+  /**
+   * Returns the recordId of the first record in this LogStream. For an empty 
log, the recordId is {@code 0}.
+   * @throws IOException
+   */
+  long getStartRecordId() throws IOException;
 
   /**
    * Returns all {@link RecordListeners} for this LogStream.
@@ -92,8 +102,9 @@ public interface LogStream extends AutoCloseable{
   boolean removeRecordListener (RecordListener listener);
 
   /**
-   * Get log service
-   * @return log service instance
+   * Get Raft Client
+   * @return Raft client
    */
-  LogService getLogService();
+
+  RaftClient getRaftClient();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
index 8ceaafb..5bdfb93 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
@@ -20,11 +20,13 @@ package org.apache.ratis.logservice.client;
 
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.logservice.LogServiceFactory;
 import org.apache.ratis.logservice.api.LogInfo;
 import org.apache.ratis.logservice.api.LogName;
-import org.apache.ratis.logservice.api.LogService;
+import org.apache.ratis.logservice.api.LogServiceConfiguration;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.api.LogStream.State;
 import org.apache.ratis.logservice.common.Constants;
+import org.apache.ratis.logservice.impl.LogStreamImpl;
 import org.apache.ratis.logservice.proto.MetaServiceProtos.*;
 import org.apache.ratis.logservice.util.MetaServiceProtoUtil;
 import org.apache.ratis.protocol.*;
@@ -48,13 +50,22 @@ public class LogServiceClient implements AutoCloseable {
 
     // the raft client for meta quorum. All DML operations are going using 
this client.
     final private RaftClient client;
-
+    final private LogServiceConfiguration config;
 
     /**
      * Constuctor. Build raft client for meta quorum
      * @param metaQuorum
      */
     public LogServiceClient(String metaQuorum) {
+        this(metaQuorum, new LogServiceConfiguration());
+    }
+
+    /**
+     * Constuctor (with configuration). Build raft client for meta quorum
+     * @param metaQuorum
+     * @param config log serice configuration
+     */
+    public LogServiceClient(String metaQuorum, LogServiceConfiguration config) 
{
         Set<RaftPeer> peers = getPeersFromQuorum(metaQuorum);
         RaftProperties properties = new RaftProperties();
         RaftGroup meta = RaftGroup.valueOf(Constants.metaGroupID, peers);
@@ -63,6 +74,7 @@ public class LogServiceClient implements AutoCloseable {
                 .setClientId(ClientId.randomId())
                 .setProperties(properties)
                 .build();
+        this.config = config;
     }
 
     /**
@@ -71,7 +83,7 @@ public class LogServiceClient implements AutoCloseable {
      * @return
      * @throws IOException
      */
-    public LogService createLog(LogName logName) throws IOException {
+    public LogStream createLog(LogName logName) throws IOException {
         RaftClientReply reply = client.sendReadOnly(
                 () -> 
MetaServiceProtoUtil.toCreateLogRequestProto(logName).toByteString());
         CreateLogReplyProto message = 
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
@@ -79,7 +91,7 @@ public class LogServiceClient implements AutoCloseable {
             throw 
MetaServiceProtoUtil.toMetaServiceException(message.getException());
         }
         LogInfo info = MetaServiceProtoUtil.toLogInfo(message.getLog());
-        return 
LogServiceFactory.getInstance().createLogService(getRaftClient(info), null);
+        return new LogStreamImpl(logName, getRaftClient(info), config);
     }
 
     /**
@@ -88,7 +100,7 @@ public class LogServiceClient implements AutoCloseable {
      * @return
      * @throws IOException
      */
-    public LogService getLog(LogName logName) throws IOException {
+    public LogStream getLog(LogName logName) throws IOException {
         RaftClientReply reply = client.sendReadOnly
                 (() -> 
MetaServiceProtoUtil.toGetLogRequestProto(logName).toByteString());
         GetLogReplyProto message = 
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
@@ -96,7 +108,7 @@ public class LogServiceClient implements AutoCloseable {
             throw 
MetaServiceProtoUtil.toMetaServiceException(message.getException());
         }
         LogInfo info = MetaServiceProtoUtil.toLogInfo(message.getLog());
-        return 
LogServiceFactory.getInstance().createLogService(getRaftClient(info), null);
+        return new LogStreamImpl(logName, getRaftClient(info), config);
     }
 
 
@@ -143,4 +155,35 @@ public class LogServiceClient implements AutoCloseable {
 
     }
 
+    /**
+     * Archives the given log out of the state machine and into a configurable 
long-term storage. A log must be
+     * in {@link State#CLOSED} to archive it.
+     *
+     * @param name The name of the log to archive.
+     */
+    void archiveLog(LogName name) throws IOException {
+      // TODO: write me
+    }
+
+    /**
+     * Moves the {@link LogStream} identified by the {@code name} from {@link 
State.OPEN} to {@link State.CLOSED}.
+     * If the log is not {@link State#OPEN}, this method returns an error.
+     *
+     * @param name The name of the log to close
+     */
+    // TODO this name sucks, confusion WRT the Java Closeable interface.
+    void closeLog(LogName name) throws IOException {
+      //TODO: write me
+    }
+
+    /**
+     * Updates a log with the new configuration object, overriding
+     * the previous configuration.
+     *
+     * @param config The new configuration object
+     */
+    void updateConfiguration(LogName name, LogServiceConfiguration config) {
+      //TODO: write me
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogReader.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogReader.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogReader.java
deleted file mode 100644
index 71d2164..0000000
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogReader.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.logservice.dummy;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.ratis.logservice.api.LogReader;
-
-public class DummyLogReader implements LogReader {
-  private static final byte[] IMMUTABLE_BYTES = new byte[0];
-
-  @Override
-  public void close() {}
-
-  @Override
-  public void seek(long recordId) throws IOException {
-    // Noop.
-    return;
-  }
-
-  @Override
-  public ByteBuffer readNext() throws IOException {
-    return ByteBuffer.wrap(IMMUTABLE_BYTES);
-  }
-
-  @Override
-  public List<ByteBuffer> readBulk(int numRecords) throws IOException {
-    ArrayList<ByteBuffer> records = new ArrayList<>(numRecords);
-    for (int i = 0; i < numRecords; i++) {
-      records.add(ByteBuffer.wrap(IMMUTABLE_BYTES));
-    }
-    return records;
-  }
-
-  @Override
-  public void readNext(ByteBuffer buffer) throws IOException {
-    buffer.clear();
-    if (buffer.remaining() < IMMUTABLE_BYTES.length) {
-      throw new IllegalArgumentException("Cannot read data into buffer of size 
" + buffer.remaining());
-    }
-    buffer.put(IMMUTABLE_BYTES);
-    buffer.flip();
-  }
-
-  @Override
-  public int readBulk(List<ByteBuffer> buffers) throws IOException {
-    for (ByteBuffer buffer : buffers) {
-      readNext(buffer);
-    }
-    return buffers.size();
-  }
-
-  @Override
-  public long getPosition() {
-    // Always at the head of the list
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java
deleted file mode 100644
index 4393e70..0000000
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.logservice.dummy;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.logservice.api.LogName;
-import org.apache.ratis.logservice.api.LogService;
-import org.apache.ratis.logservice.api.LogStream;
-import org.apache.ratis.logservice.api.LogStream.State;
-import org.apache.ratis.logservice.api.LogServiceConfiguration;
-import org.apache.ratis.logservice.api.RecordListener;
-
-public class DummyLogService implements LogService {
-  final ConcurrentHashMap<LogName,Set<RecordListener>> recordListeners = new 
ConcurrentHashMap<>();
-
-  @Override
-  public LogStream createLog(LogName name) {
-    return new DummyLogStream(this, name);
-  }
-
-  @Override
-  public LogStream createLog(LogName name, LogServiceConfiguration config) {
-    return new DummyLogStream(this, name);
-  }
-
-  @Override
-  public LogStream getLog(LogName name) {
-    return new DummyLogStream(this, name);
-  }
-
-  @Override
-  public Iterator<LogStream> listLogs() {
-    return Collections.<LogStream> emptyList().iterator();
-  }
-
-  @Override public void closeLog(LogName name) {}
-
-  @Override
-  public State getState(LogName name) {
-    return State.OPEN;
-  }
-
-  @Override public void archiveLog(LogName name) {}
-
-  @Override public void deleteLog(LogName name) {}
-
-  @Override public void updateConfiguration(LogName name, 
LogServiceConfiguration config) {}
-
-  @Override public void addRecordListener(LogName name, RecordListener 
listener) {
-    recordListeners.compute(name, (key, currentValue) -> {
-      if (currentValue == null) {
-        return new HashSet<RecordListener>(Collections.singleton(listener));
-      }
-      currentValue.add(listener);
-      return currentValue;
-    });
-  }
-
-  @Override public boolean removeRecordListener(LogName name, RecordListener 
listener) {
-    Set<RecordListener> result = recordListeners.compute(name, (key, 
currentValue) -> {
-      if (currentValue == null) {
-        return null;
-      }
-      currentValue.remove(listener);
-      return currentValue;
-    });
-    return result.size() > 0;
-  }
-
-  @Override public void close() throws IOException {}
-
-  @Override
-  public RaftClient getRaftClient() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public LogServiceConfiguration getConfiguration() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java
deleted file mode 100644
index a931951..0000000
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.logservice.dummy;
-
-import java.util.Collections;
-import java.util.Objects;
-import java.util.Set;
-
-import org.apache.ratis.logservice.api.LogName;
-import org.apache.ratis.logservice.api.LogReader;
-import org.apache.ratis.logservice.api.LogService;
-import org.apache.ratis.logservice.api.LogStream;
-import org.apache.ratis.logservice.api.LogServiceConfiguration;
-import org.apache.ratis.logservice.api.LogWriter;
-import org.apache.ratis.logservice.api.RecordListener;
-
-public class DummyLogStream implements LogStream {
-  private final LogName name;
-  private final DummyLogService service;
-
-  public DummyLogStream(DummyLogService service, LogName name) {
-    this.service = Objects.requireNonNull(service);
-    this.name = Objects.requireNonNull(name);
-  }
-
-  @Override
-  public LogName getName() {
-    return name;
-  }
-
-  @Override
-  public long getSize() {
-    return 0;
-  }
-
-  @Override
-  public LogReader createReader() {
-    return new DummyLogReader();
-  }
-
-  @Override
-  public LogWriter createWriter() {
-    return new DummyLogWriter();
-  }
-
-  @Override
-  public Set<RecordListener> getRecordListeners() {
-    Set<RecordListener> listeners = service.recordListeners.get(name);
-    if (listeners == null) {
-      return Collections.emptySet();
-    }
-    return Collections.unmodifiableSet(listeners);
-  }
-
-  @Override
-  public State getState() {
-    return State.OPEN;
-  }
-
-  @Override
-  public long getLastRecordId() {
-    return 0;
-  }
-
-  @Override
-  public LogServiceConfiguration getConfiguration() {
-    return null;
-  }
-
-  @Override
-  public void close() throws Exception {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void addRecordListener(RecordListener listener) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public boolean removeRecordListener(RecordListener listener) {
-    // TODO Auto-generated method stub
-    return false;
-  }
-
-  @Override
-  public LogService getLogService() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogWriter.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogWriter.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogWriter.java
deleted file mode 100644
index c9f689d..0000000
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogWriter.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.logservice.dummy;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.ratis.logservice.api.LogWriter;
-
-public class DummyLogWriter implements LogWriter {
-  private final AtomicLong counter;
-
-  public DummyLogWriter() {
-    this.counter = new AtomicLong(-1);
-  }
-
-  @Override public void close() {}
-
-  @Override
-  public long write(ByteBuffer data) throws IOException {
-    return counter.incrementAndGet();
-  }
-
-  @Override
-  public long write(List<ByteBuffer> records) throws IOException {
-    return counter.addAndGet(records.size());
-  }
-
-  @Override
-  public long sync() throws IOException {
-    return counter.get();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
index 3e1ea4d..74aa7bf 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
@@ -33,6 +33,11 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Log Reader implementation. This class is not thread-safe
+ *
+ */
+
 public class LogReaderImpl implements LogReader {
   public static final Logger LOG = 
LoggerFactory.getLogger(LogReaderImpl.class);
 
@@ -56,7 +61,7 @@ public class LogReaderImpl implements LogReader {
 
   public LogReaderImpl(LogStream logStream) {
     this.parent = logStream;
-    this.raftClient = logStream.getLogService().getRaftClient();
+    this.raftClient = logStream.getRaftClient();
     this.config = logStream.getConfiguration();
   }
 
@@ -67,77 +72,99 @@ public class LogReaderImpl implements LogReader {
 
   @Override
   public ByteBuffer readNext() throws IOException {
-    int num = 1;
-    RaftClientReply reply =
-        raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
-            .toReadLogRequestProto(parent.getName(), currentRecordId, num)
-            .toByteString()));
-    ReadLogReplyProto proto = 
ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
-    if (proto.hasException()) {
-      LogServiceException e = proto.getException();
-      throw new IOException(e.getErrorMsg());
+
+    try {
+      RaftClientReply reply =
+          raftClient
+              .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+                  .toReadLogRequestProto(parent.getName(), currentRecordId, 
1).toByteString()));
+      ReadLogReplyProto proto = 
ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
+      if (proto.hasException()) {
+        LogServiceException e = proto.getException();
+        throw new IOException(e.getErrorMsg());
+      }
+
+      currentRecordId++;
+
+      if (proto.getLogRecordCount() > 0) {
+        proto.getLogRecord(0);
+        return ByteBuffer.wrap(proto.getLogRecord(0).toByteArray());
+      } else {
+        return null;
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
     }
-    proto.getLogRecord(0);
-    currentRecordId++;
-    return ByteBuffer.wrap(proto.getLogRecord(0).toByteArray());
   }
 
   @Override
   public void readNext(ByteBuffer buffer) throws IOException {
-    int num = 1;
-    RaftClientReply reply =
-        raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
-            .toReadLogRequestProto(parent.getName(), currentRecordId, num)
-            .toByteString()));
-    ReadLogReplyProto proto = 
ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
-    if (proto.hasException()) {
-      LogServiceException e = proto.getException();
-      throw new IOException(e.getErrorMsg());
+    try {
+      RaftClientReply reply =
+          raftClient
+              .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+                  .toReadLogRequestProto(parent.getName(), currentRecordId, 
1).toByteString()));
+      ReadLogReplyProto proto = 
ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
+      if (proto.hasException()) {
+        LogServiceException e = proto.getException();
+        throw new IOException(e.getErrorMsg());
+      }
+      currentRecordId++;
+      if (proto.getLogRecordCount() > 0) {
+        // TODO limits
+        buffer.put(proto.getLogRecord(0).toByteArray());
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
     }
-    currentRecordId++;
-    //TODO limits
-    buffer.put(proto.getLogRecord(0).toByteArray());
   }
 
   @Override
   public List<ByteBuffer> readBulk(int numRecords) throws IOException {
-    RaftClientReply reply =
-        raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
-            .toReadLogRequestProto(parent.getName(), currentRecordId, 
numRecords)
-            .toByteString()));
-    ReadLogReplyProto proto = 
ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
-    if (proto.hasException()) {
-      LogServiceException e = proto.getException();
-      throw new IOException(e.getErrorMsg());
-    }
-    //TODO correct current record
-    currentRecordId+= numRecords;
-    List<ByteBuffer> ret = new ArrayList<ByteBuffer>();
-    int n = proto.getLogRecordCount();
-    for(int i=0; i < n; i++) {
-      ret.add(ByteBuffer.wrap(proto.getLogRecord(i).toByteArray()));
+
+    try {
+      RaftClientReply reply = raftClient
+          .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+              .toReadLogRequestProto(parent.getName(), currentRecordId, 
numRecords).toByteString()));
+      ReadLogReplyProto proto = 
ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
+      if (proto.hasException()) {
+        LogServiceException e = proto.getException();
+        throw new IOException(e.getErrorMsg());
+      }
+      int n = proto.getLogRecordCount();
+
+      // TODO correct current record
+      currentRecordId += n;
+      List<ByteBuffer> ret = new ArrayList<ByteBuffer>();
+      for (int i = 0; i < n; i++) {
+        ret.add(ByteBuffer.wrap(proto.getLogRecord(i).toByteArray()));
+      }
+      return ret;
+    } catch (Exception e) {
+      throw new IOException(e);
     }
-    return ret;
   }
 
   @Override
   public int readBulk(List<ByteBuffer> buffers) throws IOException {
-    RaftClientReply reply =
-        raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
-            .toReadLogRequestProto(parent.getName(), currentRecordId, 
buffers.size())
-            .toByteString()));
-    ReadLogReplyProto proto = 
ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
-    if (proto.hasException()) {
-      LogServiceException e = proto.getException();
-      throw new IOException(e.getErrorMsg());
-    }
-    //TODO correct current record
-    int n = proto.getLogRecordCount();
-    currentRecordId += n;
-    for(int i=0; i< n; i++) {
-      buffers.get(i).put(proto.getLogRecord(i).toByteArray());
+    try {
+      RaftClientReply reply = 
raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
+          .toReadLogRequestProto(parent.getName(), currentRecordId, 
buffers.size()).toByteString()));
+      ReadLogReplyProto proto = 
ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
+      if (proto.hasException()) {
+        LogServiceException e = proto.getException();
+        throw new IOException(e.getErrorMsg());
+      }
+      // TODO correct current record
+      int n = proto.getLogRecordCount();
+      currentRecordId += n;
+      for (int i = 0; i < n; i++) {
+        buffers.get(i).put(proto.getLogRecord(i).toByteArray());
+      }
+      return n;
+    } catch (Exception e) {
+      throw new IOException(e);
     }
-    return n;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
deleted file mode 100644
index 613ec5e..0000000
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.logservice.impl;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.logservice.api.LogName;
-import org.apache.ratis.logservice.api.LogService;
-import org.apache.ratis.logservice.api.LogServiceConfiguration;
-import org.apache.ratis.logservice.api.LogStream;
-import org.apache.ratis.logservice.api.LogStream.State;
-import org.apache.ratis.logservice.api.RecordListener;
-import org.apache.ratis.logservice.proto.LogServiceProtos.*;
-import org.apache.ratis.logservice.util.LogServiceProtoUtil;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
-
-public class LogServiceImpl implements LogService {
-
-  final private RaftClient raftClient;
-  final private LogServiceConfiguration config;
-
-  public LogServiceImpl(RaftClient raftClient, LogServiceConfiguration config) 
{
-    this.raftClient = raftClient;
-    this.config = config;
-  }
-
-  @Override
-  public LogStream createLog(LogName name) throws IOException {
-//    RaftClientReply reply =
-//        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name)
-//            .toByteString()));
-//    CreateLogReplyProto parseFrom = 
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
-//    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this);
-    return new LogStreamImpl(name, this);
-  }
-
-  @Override
-  public LogStream getLog(LogName name) throws IOException {
-//    RaftClientReply reply =
-//        
raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toGetLogRequestProto(name)
-//            .toByteString()));
-//    GetLogReplyProto parseFrom = 
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
-//    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this);
-    return null;
-  }
-
-  @Override
-  public Iterator<LogStream> listLogs() throws IOException {
-//    RaftClientReply reply =
-//        raftClient
-//            
.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toListLogRequestProto().toByteString()));
-//    ListLogsReplyProto parseFrom = 
ListLogsReplyProto.parseFrom(reply.getMessage().getContent());
-//    List<LogStreamProto> logStremsList = parseFrom.getLogStremsList();
-//    return LogServiceProtoUtil.toListLogStreams(logStremsList, 
this).iterator();
-    return null;
-  }
-
-  @Override
-  public void closeLog(LogName name) throws IOException {
-    RaftClientReply reply =
-        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toCloseLogRequestProto(name)
-            .toByteString()));
-    CloseLogReplyProto parseFrom = 
CloseLogReplyProto.parseFrom(reply.getMessage().getContent());
-  }
-
-  @Override
-  public State getState(LogName name) throws IOException {
-    RaftClientReply reply =
-        
raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toGetStateRequestProto(name)
-            .toByteString()));
-    GetStateReplyProto parseFrom = 
GetStateReplyProto.parseFrom(reply.getMessage().getContent());
-    return parseFrom.getState() == LogStreamState.OPEN ? State.OPEN : 
State.CLOSED;
-  }
-
-  @Override
-  public void archiveLog(LogName name) throws IOException {
-//    RaftClientReply reply =
-//        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toArchiveLogRequestProto(name)
-//            .toByteString()));
-//    ArchiveLogReplyProto parseFrom =
-//        ArchiveLogReplyProto.parseFrom(reply.getMessage().getContent());
-  }
-
-  @Override
-  public void deleteLog(LogName name) throws IOException {
-//    RaftClientReply reply =
-//        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toDeleteLogRequestProto(name)
-//            .toByteString()));
-//    DeleteLogReplyProto parseFrom = 
DeleteLogReplyProto.parseFrom(reply.getMessage().getContent());
-  }
-
-
-  @Override
-  public void addRecordListener(LogName name, RecordListener listener) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public boolean removeRecordListener(LogName name, RecordListener listener) {
-    // TODO Auto-generated method stub
-    return false;
-  }
-
-  @Override
-  public void close() throws IOException {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public LogStream createLog(LogName name, LogServiceConfiguration config) 
throws IOException {
-    //TODO configuration
-//    RaftClientReply reply =
-//        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name)
-//            .toByteString()));
-//    CreateLogReplyProto parseFrom = 
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
-//    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this);
-    return null;
-  }
-
-  @Override
-  public void updateConfiguration(LogName name, LogServiceConfiguration 
config) {
-    // TODO Auto-generated method stub
-
-  }
-
-
-  @Override
-  public RaftClient getRaftClient() {
-    return raftClient;
-  }
-
-  @Override
-  public LogServiceConfiguration getConfiguration() {
-    return config;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
index 2c7feac..2ab0103 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
@@ -17,19 +17,26 @@
  */
 package org.apache.ratis.logservice.impl;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.logservice.api.LogName;
 import org.apache.ratis.logservice.api.LogReader;
-import org.apache.ratis.logservice.api.LogService;
-import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogServiceConfiguration;
+import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogWriter;
 import org.apache.ratis.logservice.api.RecordListener;
-import org.apache.ratis.logservice.proto.LogServiceProtos;
+import 
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLastCommittedIndexReplyProto;
+import 
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLengthReplyProto;
+import 
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogStartIndexReplyProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceException;
+import org.apache.ratis.logservice.util.LogServiceProtoUtil;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +54,7 @@ public class LogStreamImpl implements LogStream {
   /*
    * Parent log service instance
    */
-  LogService service;
+  RaftClient raftClient;
   /*
    * Log stream configuration
    */
@@ -62,22 +69,16 @@ public class LogStreamImpl implements LogStream {
    */
   long length;
 
-  public LogStreamImpl(LogServiceProtos.LogStreamProto proto, LogService 
service) {
-    this.service = service;
-    this.name = LogName.of(proto.getLogName().getName());
-    this.config = service.getConfiguration();
-    init();
-  }
 
-  public LogStreamImpl(LogName name, LogService logService) {
-    this.service = logService;
+  public LogStreamImpl(LogName name, RaftClient raftClient) {
+    this.raftClient = raftClient;
     this.name = name;
-    this.config = this.service.getConfiguration();
+    this.config = new LogServiceConfiguration();
     init();
   }
 
-  public LogStreamImpl(LogName name, LogService logService, 
LogServiceConfiguration config) {
-    this.service = logService;
+  public LogStreamImpl(LogName name, RaftClient raftClient, 
LogServiceConfiguration config) {
+    this.raftClient = raftClient;
     this.name = name;
     this.config = config;
     init();
@@ -100,9 +101,17 @@ public class LogStreamImpl implements LogStream {
   }
 
   @Override
-  public long getSize() {
-    // TODO use raft client to query state machine
-    return 0;
+  public long getSize() throws IOException{
+      RaftClientReply reply = raftClient
+          .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+              .toGetLengthRequestProto(name).toByteString()));
+      GetLogLengthReplyProto proto =
+          GetLogLengthReplyProto.parseFrom(reply.getMessage().getContent());
+      if (proto.hasException()) {
+        LogServiceException e = proto.getException();
+        throw new IOException(e.getErrorMsg());
+      }
+      return proto.getLength();
   }
 
   @Override
@@ -116,9 +125,39 @@ public class LogStreamImpl implements LogStream {
   }
 
   @Override
-  public long getLastRecordId() {
-    // TODO use raft client to query state machine
-    return 0;
+  public long getLastRecordId() throws IOException {
+    try {
+      RaftClientReply reply = raftClient
+          .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+              .toGetLastCommittedIndexRequestProto(name).toByteString()));
+      GetLogLastCommittedIndexReplyProto proto =
+          
GetLogLastCommittedIndexReplyProto.parseFrom(reply.getMessage().getContent());
+      if (proto.hasException()) {
+        LogServiceException e = proto.getException();
+        throw new IOException(e.getErrorMsg());
+      }
+      return proto.getLastCommittedIndex();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public long getStartRecordId() throws IOException {
+    try {
+      RaftClientReply reply = raftClient
+          .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+              .toGetStartIndexProto(name).toByteString()));
+      GetLogStartIndexReplyProto proto =
+          
GetLogStartIndexReplyProto.parseFrom(reply.getMessage().getContent());
+      if (proto.hasException()) {
+        LogServiceException e = proto.getException();
+        throw new IOException(e.getErrorMsg());
+      }
+      return proto.getStartIndex();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
   }
 
   @Override
@@ -152,8 +191,8 @@ public class LogStreamImpl implements LogStream {
   }
 
   @Override
-  public LogService getLogService() {
-    return service;
+  public RaftClient getRaftClient() {
+    return raftClient;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
index da19e70..e7a7d4a 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
@@ -47,12 +47,13 @@ public class LogWriterImpl implements LogWriter {
   private RaftClient   raftClient;
   /*
    * Log service configuration object
+   * TODO: usage of custom configuration
    */
   private LogServiceConfiguration config;
 
   public LogWriterImpl(LogStream logStream) {
     this.parent = logStream;
-    this.raftClient = logStream.getLogService().getRaftClient();
+    this.raftClient = logStream.getRaftClient();
     this.config = logStream.getConfiguration();
   }
 
@@ -60,41 +61,45 @@ public class LogWriterImpl implements LogWriter {
   public long write(ByteBuffer data) throws IOException {
     List<ByteBuffer> list = new ArrayList<ByteBuffer>();
     list.add(data);
-    RaftClientReply reply =
-            null;
-    try {
-      reply = raftClient.sendAsync(Message.valueOf(LogServiceProtoUtil
-          .toAppendBBEntryLogRequestProto(parent.getName(), list)
-          .toByteString())).get();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    } catch (ExecutionException e) {
-      e.printStackTrace();
-    }
-    AppendLogEntryReplyProto proto = 
AppendLogEntryReplyProto.parseFrom(reply.getMessage().getContent());
-    if (proto.hasException()) {
-      LogServiceException e = proto.getException();
-      throw new IOException(e.getErrorMsg());
-    }
-    //TODO current record id
-    return 0;
-  }
+     return write(list);
+   }
 
-  @Override
-  public long sync() throws IOException {
-    RaftClientReply reply =
-        raftClient.send(Message.valueOf(LogServiceProtoUtil
-            .toSyncLogRequestProto(parent.getName())
-            .toByteString()));
-    SyncLogReplyProto proto = 
SyncLogReplyProto.parseFrom(reply.getMessage().getContent());
-    if (proto.hasException()) {
-      LogServiceException e = proto.getException();
-      throw new IOException(e.getErrorMsg());
-    }
-    //TODO current record id
-    return 0;
-  }
+   @Override
+   public long write(List<ByteBuffer> list) throws IOException {
 
+     try {
+       RaftClientReply reply = raftClient.send(Message.valueOf(
+         LogServiceProtoUtil.toAppendBBEntryLogRequestProto(parent.getName(), 
list).toByteString()));
+       AppendLogEntryReplyProto proto =
+           AppendLogEntryReplyProto.parseFrom(reply.getMessage().getContent());
+       if (proto.hasException()) {
+         LogServiceException e = proto.getException();
+         throw new IOException(e.getErrorMsg());
+       }
+       List<Long> ids = proto.getRecordIdList();
+       // The above call Always returns one id (regardless of a batch size)
+       return ids.get(0);
+     } catch (Exception e) {
+       throw new IOException(e);
+   }
+ }
+
+ @Override
+ public long sync() throws IOException {
+     try {
+       RaftClientReply reply = raftClient.send(Message
+           
.valueOf(LogServiceProtoUtil.toSyncLogRequestProto(parent.getName()).toByteString()));
+
+       SyncLogReplyProto proto = 
SyncLogReplyProto.parseFrom(reply.getMessage().getContent());
+       if (proto.hasException()) {
+         LogServiceException e = proto.getException();
+         throw new IOException(e.getErrorMsg());
+       }
+       return proto.getLastRecordId();
+     } catch (Exception e) {
+       throw new IOException(e);
+   }
+  }
   @Override
   public void close() throws IOException {
     //TODO

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index ab614d9..6a386b8 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -18,22 +18,50 @@
 
 package org.apache.ratis.logservice.server;
 
+import static org.apache.ratis.logservice.common.Constants.metaGroupID;
+import static org.apache.ratis.logservice.common.Constants.serversGroupID;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.logservice.api.LogName;
 import org.apache.ratis.logservice.api.LogInfo;
+import org.apache.ratis.logservice.api.LogName;
 import org.apache.ratis.logservice.common.LogAlreadyExistException;
 import org.apache.ratis.logservice.common.LogNotFoundException;
 import org.apache.ratis.logservice.common.NoEnoughWorkersException;
 import org.apache.ratis.logservice.proto.MetaServiceProtos;
-import org.apache.ratis.logservice.proto.MetaServiceProtos.*;
+import 
org.apache.ratis.logservice.proto.MetaServiceProtos.ArchiveLogReplyProto;
+import 
org.apache.ratis.logservice.proto.MetaServiceProtos.ArchiveLogRequestProto;
+import 
org.apache.ratis.logservice.proto.MetaServiceProtos.CreateLogRequestProto;
+import 
org.apache.ratis.logservice.proto.MetaServiceProtos.DeleteLogRequestProto;
+import 
org.apache.ratis.logservice.proto.MetaServiceProtos.LogServicePingRequestProto;
+import 
org.apache.ratis.logservice.proto.MetaServiceProtos.LogServiceRegisterLogRequestProto;
+import 
org.apache.ratis.logservice.proto.MetaServiceProtos.LogServiceUnregisterLogRequestProto;
+import org.apache.ratis.logservice.proto.MetaServiceProtos.MetaSMRequestProto;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
 import org.apache.ratis.logservice.util.MetaServiceProtoUtil;
 import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
@@ -41,19 +69,6 @@ import org.apache.ratis.util.AutoCloseableLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.StreamSupport;
-
-import static org.apache.ratis.logservice.common.Constants.metaGroupID;
-import static org.apache.ratis.logservice.common.Constants.serversGroupID;
-
 /**
  * State Machine serving meta data for LogService. It persists the pairs 'log 
name' -> RaftGroup
  * During the start basing on the persisted data it would be able to build a 
list of the existing servers.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
index 59037b8..4a044c6 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
@@ -18,16 +18,15 @@
 
 package org.apache.ratis.logservice.util;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.ratis.logservice.api.LogName;
-import org.apache.ratis.logservice.api.LogService;
 import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogStream.State;
-import org.apache.ratis.logservice.impl.LogStreamImpl;
 import org.apache.ratis.logservice.proto.LogServiceProtos;
 import org.apache.ratis.logservice.proto.LogServiceProtos.*;
 import org.apache.ratis.logservice.proto.MetaServiceProtos.*;
@@ -43,7 +42,7 @@ public class LogServiceProtoUtil {
     return LogName.of(logNameProto.getName());
   }
 
-  public static LogStreamProto toLogStreamProto(LogStream logStream) {
+  public static LogStreamProto toLogStreamProto(LogStream logStream) throws 
IOException {
     LogNameProto logNameProto =
             
LogNameProto.newBuilder().setName(logStream.getName().getName()).build();
     LogStreamProto logStreamProto =
@@ -57,11 +56,6 @@ public class LogServiceProtoUtil {
     return logStreamProto;
   }
 
-  public static LogStream toLogStream(LogStreamProto logStream, LogService 
parent) {
-    return new LogStreamImpl(logStream, parent);
-  }
-
-
   public static LogServiceRequestProto toCloseLogRequestProto(LogName logName) 
{
     LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
     CloseLogRequestProto closeLog =
@@ -89,6 +83,15 @@ public class LogServiceProtoUtil {
     return 
LogServiceRequestProto.newBuilder().setLengthQuery(builder.build()).build();
   }
 
+  public static LogServiceRequestProto 
toGetLastCommittedIndexRequestProto(LogName name) {
+    LogNameProto logNameProto =
+        LogNameProto.newBuilder().setName(name.getName()).build();
+    GetLogLastCommittedIndexRequestProto.Builder builder =
+        GetLogLastCommittedIndexRequestProto.newBuilder();
+    builder.setLogName(logNameProto);
+    return 
LogServiceRequestProto.newBuilder().setLastIndexQuery(builder.build()).build();
+  }
+
   public static LogServiceRequestProto toGetStartIndexProto(LogName name) {
     LogNameProto logNameProto =
         LogNameProto.newBuilder().setName(name.getName()).build();
@@ -139,15 +142,6 @@ public class LogServiceProtoUtil {
     return 
LogServiceRequestProto.newBuilder().setAppendRequest(builder.build()).build();
   }
 
-  public static List<LogStream> toListLogStreams(List<LogStreamProto> 
logStreamProtos,
-      LogService parent) {
-    List<LogStream> logStreams = new ArrayList<>(logStreamProtos.size());
-    for (LogStreamProto proto : logStreamProtos) {
-      logStreams.add(toLogStream(proto, parent));
-    }
-    return logStreams;
-  }
-
   public static List<byte[]> toListByteArray(List<ByteString> list) {
     List<byte[]> retVal = new ArrayList<byte[]>(list.size());
     for(int i=0; i < list.size(); i++) {
@@ -181,6 +175,19 @@ public class LogServiceProtoUtil {
     return builder.build();
   }
 
+  public static GetLogLastCommittedIndexReplyProto
+        toGetLogLastIndexReplyProto(long lastIndex, Throwable t) {
+
+    GetLogLastCommittedIndexReplyProto.Builder builder =
+        GetLogLastCommittedIndexReplyProto.newBuilder();
+    if (t != null) {
+      builder.setException(toLogException(t));
+    } else {
+      builder.setLastCommittedIndex(lastIndex);
+    }
+    return builder.build();
+  }
+
   public static ReadLogReplyProto toReadLogReplyProto(List<byte[]> entries, 
Throwable t) {
     ReadLogReplyProto.Builder builder = ReadLogReplyProto.newBuilder();
     if (t != null) {
@@ -198,18 +205,19 @@ public class LogServiceProtoUtil {
     if (t!= null) {
       builder.setException(toLogException(t));
     } else if (ids != null){
-      int index = 0;
       for(long id: ids) {
-        builder.setRecordId(index++, id);
+        builder.addRecordId(id);
       }
     }
     return builder.build();
   }
 
-  public static SyncLogReplyProto toSyncLogReplyProto(Throwable t) {
+  public static SyncLogReplyProto toSyncLogReplyProto(long index, Throwable t) 
{
     SyncLogReplyProto.Builder builder = SyncLogReplyProto.newBuilder();
     if (t != null) {
       builder.setException(toLogException(t));
+    } else {
+      builder.setLastRecordId(index);
     }
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/main/proto/LogService.proto
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/main/proto/LogService.proto 
b/ratis-logservice/src/main/proto/LogService.proto
index 479a0f7..da15aad 100644
--- a/ratis-logservice/src/main/proto/LogService.proto
+++ b/ratis-logservice/src/main/proto/LogService.proto
@@ -80,8 +80,9 @@ message SyncLogRequestProto {
 
 // Sync reply
 message SyncLogReplyProto {
+       uint64 lastRecordId = 1;
        // optional
-       LogServiceException exception = 1;
+       LogServiceException exception = 2;
 }
 
 // Read request
@@ -121,6 +122,16 @@ message GetLogStartIndexReplyProto {
        LogServiceException exception = 2;
 }
 
+message GetLogLastCommittedIndexRequestProto {
+       LogNameProto logName  = 1;
+}
+
+message GetLogLastCommittedIndexReplyProto {
+       uint64 lastCommittedIndex = 1;
+       //optional
+       LogServiceException exception = 2;
+}
+
 message LogServiceRequestProto {
   oneof Request {
     CloseLogRequestProto closeLog = 1;
@@ -130,6 +141,7 @@ message LogServiceRequestProto {
        GetLogStartIndexRequestProto startIndexQuery = 5;
        AppendLogEntryRequestProto appendRequest = 6;
        SyncLogRequestProto          syncRequest = 7;
+       GetLogLastCommittedIndexRequestProto lastIndexQuery = 8;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b901b3a5/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
index 1146802..8c21ecc 100644
--- 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
@@ -27,7 +27,6 @@ import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.logservice.api.LogName;
-import org.apache.ratis.logservice.api.LogService;
 import org.apache.ratis.logservice.api.LogServiceConfiguration;
 import org.apache.ratis.logservice.api.LogStateMachine;
 import org.apache.ratis.logservice.api.LogStream;
@@ -35,10 +34,10 @@ import org.apache.ratis.logservice.api.LogStream.State;
 import org.apache.ratis.statemachine.StateMachine;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 public abstract class LogServiceBaseTest<CLUSTER extends MiniRaftCluster>
     extends BaseTest
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
@@ -63,27 +62,27 @@ public abstract class LogServiceBaseTest<CLUSTER extends 
MiniRaftCluster>
 
   @Test
   public void testLogServiceAdminAPIs() throws Exception {
-    RaftClient raftClient =
-        
RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
-            .build();
-    LogService logService = 
LogServiceFactory.getInstance().createLogService(raftClient,
-      new LogServiceConfiguration());
-    LogName logName = LogName.of("log1");
-    LogStream logStream = logService.createLog(logName);
-    assertEquals("log1", logStream.getName().getName());
-    assertEquals(State.OPEN, logStream.getState());
-    assertEquals(0, logStream.getSize());
-    logService.getLog(logName);
-    assertEquals("log1", logStream.getName().getName());
-    assertEquals(State.OPEN, logStream.getState());
-    assertEquals(0, logStream.getSize());
-    // TODO fix me
-    //    logStream = logService.listLogs().next();
-    //    assertEquals("log1", logStream.getName().getName());
-    //    assertEquals(State.OPEN, logStream.getState());
-    //    assertEquals(0, logStream.getSize());
-    State state = logService.getState(logName);
-    assertEquals(State.OPEN, state);
+//    RaftClient raftClient =
+//        
RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
+//            .build();
+//    LogStream logService = 
LogServiceFactory.getInstance().createLogService(raftClient,
+//      new LogServiceConfiguration());
+//    LogName logName = LogName.of("log1");
+//    LogStream logStream = logService.createLog(logName);
+//    assertEquals("log1", logStream.getName().getName());
+//    assertEquals(State.OPEN, logStream.getState());
+//    assertEquals(0, logStream.getSize());
+//    logService.getLog(logName);
+//    assertEquals("log1", logStream.getName().getName());
+//    assertEquals(State.OPEN, logStream.getState());
+//    assertEquals(0, logStream.getSize());
+//    // TODO fix me
+//    //    logStream = logService.listLogs().next();
+//    //    assertEquals("log1", logStream.getName().getName());
+//    //    assertEquals(State.OPEN, logStream.getState());
+//    //    assertEquals(0, logStream.getSize());
+//    State state = logService.getState(logName);
+//    assertEquals(State.OPEN, state);
   }
 
   @After

Reply via email to