This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 3419793 RATIS-555. Move RaftLog and the implementations to separated
packages.
3419793 is described below
commit 34197939571287dd0b81896499b028b82a29b0fa
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Wed May 15 16:19:07 2019 -0700
RATIS-555. Move RaftLog and the implementations to separated packages.
---
.../main/java/org/apache/ratis/util/JavaUtils.java | 8 -----
.../logservice/server/LogServiceRaftLogReader.java | 4 +--
.../ratis/logservice/server/LogStateMachine.java | 2 +-
.../org/apache/ratis/server/impl/FollowerInfo.java | 2 +-
.../org/apache/ratis/server/impl/LeaderState.java | 2 +-
.../org/apache/ratis/server/impl/LogAppender.java | 6 ++--
.../apache/ratis/server/impl/RaftServerImpl.java | 2 +-
.../org/apache/ratis/server/impl/ServerState.java | 5 ++-
.../ratis/server/impl/StateMachineUpdater.java | 2 +-
.../ratis/server/{storage => raftlog}/RaftLog.java | 19 +++++-----
.../{storage => raftlog}/RaftLogIOException.java | 4 +--
.../server/{storage => raftlog}/RaftLogIndex.java | 2 +-
.../{storage => raftlog}/RaftLogSequentialOps.java | 2 +-
.../{storage => raftlog/memory}/MemoryRaftLog.java | 11 +++---
.../segmented}/BufferedChannelBase.java | 4 +--
.../segmented}/BufferedWriteChannel.java | 4 +--
.../segmented}/CacheInvalidationPolicy.java | 6 ++--
.../{storage => raftlog/segmented}/LogSegment.java | 8 +++--
.../segmented}/SegmentedRaftLog.java | 42 ++++++++++------------
.../segmented/SegmentedRaftLogCache.java} | 13 +++----
.../segmented}/SegmentedRaftLogFormat.java | 2 +-
.../segmented/SegmentedRaftLogInputStream.java} | 23 ++++++------
.../segmented/SegmentedRaftLogOutputStream.java} | 8 ++---
.../segmented/SegmentedRaftLogReader.java} | 14 ++++----
.../segmented/SegmentedRaftLogWorker.java} | 27 +++++++-------
.../org/apache/ratis/server/storage/MetaFile.java | 8 ++---
.../apache/ratis/server/storage/RaftStorage.java | 4 +--
.../ratis/server/storage/RaftStorageDirectory.java | 6 ++--
.../java/org/apache/ratis/LogAppenderTests.java | 2 +-
.../java/org/apache/ratis/MiniRaftCluster.java | 4 +--
.../test/java/org/apache/ratis/RaftBasicTests.java | 2 +-
.../org/apache/ratis/RaftExceptionBaseTest.java | 4 +--
.../test/java/org/apache/ratis/RaftTestUtil.java | 2 +-
.../java/org/apache/ratis/RetryCacheTests.java | 4 +--
.../apache/ratis/server/ServerRestartTests.java | 6 ++--
.../ratis/server/impl/LeaderElectionTests.java | 6 ++--
.../server/impl/RaftReconfigurationBaseTest.java | 2 +-
.../impl/RaftStateMachineExceptionTests.java | 2 +-
.../segmented/SegmentedRaftLogTestUtils.java} | 25 ++++++-------
.../ratis/server/storage/RaftStorageTestUtils.java | 16 ++-------
.../ratis/statemachine/RaftSnapshotBaseTest.java | 2 +-
.../statemachine/SimpleStateMachine4Testing.java | 8 ++---
.../ratis/grpc/TestInstallSnapshotWithGrpc.java | 2 +-
.../java/org/apache/ratis/grpc/TestRaftStream.java | 2 +-
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 2 +-
.../{storage => raftlog}/TestRaftLogIndex.java | 2 +-
.../server/{ => raftlog}/TestRaftLogMetrics.java | 11 +++---
.../segmented}/TestCacheEviction.java | 9 ++---
.../segmented/TestLogSegment.java} | 28 +++++++--------
.../segmented}/TestRaftLogReadWrite.java | 38 +++++++++-----------
.../segmented}/TestSegmentedRaftLog.java | 18 +++++-----
.../segmented/TestSegmentedRaftLogCache.java} | 13 ++++---
52 files changed, 212 insertions(+), 238 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 94e2640..4e4d6c0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -165,14 +165,6 @@ public interface JavaUtils {
throw new IllegalStateException("BUG: this line should be unreachable.");
}
- /** @deprecated use {@link #attempt(CheckedRunnable, int, TimeDuration,
String, Logger)} */
- @Deprecated
- static <THROWABLE extends Throwable> void attempt(
- CheckedRunnable<THROWABLE> op, int numAttempts, long sleepMs, String
name, Logger log)
- throws THROWABLE, InterruptedException {
- attempt(op, numAttempts, TimeDuration.valueOf(sleepMs,
TimeUnit.MILLISECONDS), name, log);
- }
-
/** Attempt to run the given op multiple times. */
static <THROWABLE extends Throwable> void attempt(
CheckedRunnable<THROWABLE> runnable, int numAttempts, TimeDuration
sleepTime, String name, Logger log)
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
index da31f8f..8c6e00d 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
@@ -25,8 +25,8 @@ import
org.apache.ratis.logservice.proto.LogServiceProtos.AppendLogEntryRequestP
import
org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto;
import
org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto.RequestCase;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.server.storage.RaftLogIOException;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index ef6c385..aafb16a 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -49,7 +49,7 @@ import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.impl.ServerState;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index ec02553..11cf87f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -19,7 +19,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.storage.RaftLogIndex;
+import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.util.Timestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 844434f..057936b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -22,7 +22,7 @@ import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 0052a2f..ff13136 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -21,10 +21,10 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftLog.EntryWithData;
import org.apache.ratis.server.storage.FileInfo;
-import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.server.storage.RaftLogIOException;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog.EntryWithData;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.statemachine.SnapshotInfo;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 584332d..90241bd 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -28,7 +28,7 @@ import
org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index aa21956..7827ea5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -21,6 +21,9 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.memory.MemoryRaftLog;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
import org.apache.ratis.server.storage.*;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -198,7 +201,7 @@ public class ServerState implements Closeable {
return log;
}
- public RaftConfiguration getRaftConf() {
+ RaftConfiguration getRaftConf() {
return configurationManager.getCurrent();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 8e67907..97fe4e4 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -20,7 +20,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
similarity index 96%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index 84e20ab..b7d2d5c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.RaftPeerId;
@@ -292,7 +292,7 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
/**
* Validate the term and index of entry w.r.t RaftLog
*/
- void validateLogEntry(LogEntryProto entry) {
+ protected void validateLogEntry(LogEntryProto entry) {
if (entry.hasMetadataEntry()) {
return;
}
@@ -310,8 +310,7 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
return runner.runSequentially(() -> truncateImpl(index));
}
- abstract CompletableFuture<Long> truncateImpl(long index);
-
+ protected abstract CompletableFuture<Long> truncateImpl(long index);
/**
* Purge asynchronously delete the segment files which does not overlap with
the given index.
@@ -323,7 +322,7 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
return purgeImpl(index);
}
- abstract CompletableFuture<Long> purgeImpl(long index);
+ protected abstract CompletableFuture<Long> purgeImpl(long index);
@Override
@@ -331,14 +330,14 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
return runner.runSequentially(() -> appendEntryImpl(entry));
}
- abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto entry);
+ protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto
entry);
@Override
public final List<CompletableFuture<Long>> append(LogEntryProto... entries) {
return runner.runSequentially(() -> appendImpl(entries));
}
- abstract List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries);
+ protected abstract List<CompletableFuture<Long>> appendImpl(LogEntryProto...
entries);
/**
* @return the index of the latest entry that has been flushed to the local
@@ -422,10 +421,10 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
* Holds proto entry along with future which contains read state machine data
*/
public class EntryWithData {
- private LogEntryProto logEntry;
- private CompletableFuture<ByteString> future;
+ private final LogEntryProto logEntry;
+ private final CompletableFuture<ByteString> future;
- EntryWithData(LogEntryProto logEntry, CompletableFuture<ByteString>
future) {
+ public EntryWithData(LogEntryProto logEntry, CompletableFuture<ByteString>
future) {
this.logEntry = logEntry;
this.future = future;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIOException.java
similarity index 96%
copy from
ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
copy to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIOException.java
index 5b16b13..37ab9ae 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIOException.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog;
import org.apache.ratis.protocol.RaftException;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
similarity index 98%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
index 67e7e14..1c42d84 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.StringUtils;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogSequentialOps.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
similarity index 99%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogSequentialOps.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
index 8ed92b8..5e39596 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogSequentialOps.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.StateMachineException;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
similarity index 95%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 83ae36f..3626755 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.memory;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.Preconditions;
@@ -113,7 +114,7 @@ public class MemoryRaftLog extends RaftLog {
}
@Override
- CompletableFuture<Long> truncateImpl(long index) {
+ protected CompletableFuture<Long> truncateImpl(long index) {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
Preconditions.assertTrue(index >= 0);
@@ -124,7 +125,7 @@ public class MemoryRaftLog extends RaftLog {
@Override
- CompletableFuture<Long> purgeImpl(long index) {
+ protected CompletableFuture<Long> purgeImpl(long index) {
try (AutoCloseableLock writeLock = writeLock()) {
Preconditions.assertTrue(index >= 0);
entries.purge(Math.toIntExact(index));
@@ -141,7 +142,7 @@ public class MemoryRaftLog extends RaftLog {
}
@Override
- CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
+ protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
validateLogEntry(entry);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedChannelBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedChannelBase.java
similarity index 96%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedChannelBase.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedChannelBase.java
index 0dfc6f0..bfb1689 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedChannelBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedChannelBase.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import java.io.Closeable;
import java.io.IOException;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedWriteChannel.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
similarity index 98%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedWriteChannel.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index e61a4d3..3b302e7 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedWriteChannel.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import java.io.IOException;
import java.nio.ByteBuffer;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java
similarity index 97%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java
index a794092..f5b65c3 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.ratis.server.storage.RaftLogCache.LogSegmentList;
+import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.LogSegmentList;
import org.apache.ratis.util.AutoCloseableLock;
public interface CacheInvalidationPolicy {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
similarity index 97%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index a74d6a8..5bf6e8a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,11 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.server.storage.RaftStorage;
import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
@@ -110,7 +112,7 @@ class LogSegment implements Comparable<Long> {
private static int readSegmentFile(File file, long start, long end,
boolean isOpen, Consumer<LogEntryProto> entryConsumer) throws
IOException {
int count = 0;
- try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) {
+ try (SegmentedRaftLogInputStream in = new
SegmentedRaftLogInputStream(file, start, end, isOpen)) {
for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null;
prev = next) {
if (prev != null) {
Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1,
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
similarity index 92%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 607816e..3e2fb7b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeerId;
@@ -23,9 +23,12 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.LogSegment.LogRecord;
-import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
-import org.apache.ratis.server.storage.RaftLogCache.TruncateIndices;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
+import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecordWithEntry;
+import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncateIndices;
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.StateMachine;
@@ -72,7 +75,7 @@ public class SegmentedRaftLog extends RaftLog {
/**
* I/O task definitions.
*/
- static abstract class Task {
+ abstract static class Task {
private final CompletableFuture<Long> future = new CompletableFuture<>();
CompletableFuture<Long> getFuture() {
@@ -100,8 +103,8 @@ public class SegmentedRaftLog extends RaftLog {
private final Optional<RaftServerImpl> server;
private final RaftStorage storage;
- private final RaftLogCache cache;
- private final RaftLogWorker fileLogWorker;
+ private final SegmentedRaftLogCache cache;
+ private final SegmentedRaftLogWorker fileLogWorker;
private final long segmentMaxSize;
private final boolean stateMachineCachingEnabled;
@@ -119,8 +122,8 @@ public class SegmentedRaftLog extends RaftLog {
this.server = Optional.ofNullable(server);
this.storage = storage;
segmentMaxSize =
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
- cache = new RaftLogCache(selfId, storage, properties);
- this.fileLogWorker = new RaftLogWorker(selfId, stateMachine,
submitUpdateCommitEvent, storage, properties);
+ this.cache = new SegmentedRaftLogCache(selfId, storage, properties);
+ this.fileLogWorker = new SegmentedRaftLogWorker(selfId, stateMachine,
submitUpdateCommitEvent, storage, properties);
stateMachineCachingEnabled =
RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
}
@@ -249,15 +252,11 @@ public class SegmentedRaftLog extends RaftLog {
}
}
- /**
- * The method, along with {@link #appendEntry} and
- * {@link #append(LogEntryProto...)} need protection of RaftServer's lock.
- */
@Override
- CompletableFuture<Long> truncateImpl(long index) {
+ protected CompletableFuture<Long> truncateImpl(long index) {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
- RaftLogCache.TruncationSegments ts = cache.truncate(index);
+ SegmentedRaftLogCache.TruncationSegments ts = cache.truncate(index);
if (ts != null) {
Task task = fileLogWorker.truncate(ts, index);
return task.getFuture();
@@ -268,9 +267,9 @@ public class SegmentedRaftLog extends RaftLog {
@Override
- public CompletableFuture<Long> purgeImpl(long index) {
+ protected CompletableFuture<Long> purgeImpl(long index) {
try (AutoCloseableLock writeLock = writeLock()) {
- RaftLogCache.TruncationSegments ts = cache.purge(index);
+ SegmentedRaftLogCache.TruncationSegments ts = cache.purge(index);
LOG.debug("truncating segments:{}", ts);
if (ts != null) {
Task task = fileLogWorker.purge(ts);
@@ -281,7 +280,7 @@ public class SegmentedRaftLog extends RaftLog {
}
@Override
- CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
+ protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
checkLogState();
if (LOG.isTraceEnabled()) {
LOG.trace("{}: appendEntry {}", getSelfId(),
@@ -383,11 +382,6 @@ public class SegmentedRaftLog extends RaftLog {
return fileLogWorker.getFlushedIndex();
}
- /**
- * {@inheritDoc}
- *
- * This operation is protected by the RaftServer's lock
- */
@Override
public void writeMetadata(long term, RaftPeerId votedFor) throws IOException
{
storage.getMetaFile().set(term, votedFor != null ? votedFor.toString() :
null);
@@ -424,7 +418,7 @@ public class SegmentedRaftLog extends RaftLog {
storage.close();
}
- RaftLogCache getRaftLogCache() {
+ SegmentedRaftLogCache getRaftLogCache() {
return cache;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
similarity index 97%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 8d4fc12..b94c078 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -24,8 +24,9 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
-import
org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
-import org.apache.ratis.server.storage.LogSegment.LogRecord;
+import org.apache.ratis.server.storage.RaftStorage;
+import
org.apache.ratis.server.raftlog.segmented.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
+import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.AutoCloseableReadWriteLock;
@@ -45,8 +46,8 @@ import static
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX
* caches all the segments in the memory. The cache is not thread-safe and
* requires external lock protection.
*/
-class RaftLogCache {
- public static final Logger LOG = LoggerFactory.getLogger(RaftLogCache.class);
+class SegmentedRaftLogCache {
+ public static final Logger LOG =
LoggerFactory.getLogger(SegmentedRaftLogCache.class);
static class SegmentFileInfo {
final long startIndex; // start index of the segment
@@ -281,7 +282,7 @@ class RaftLogCache {
private final int maxCachedSegments;
private final CacheInvalidationPolicy evictionPolicy = new
CacheInvalidationPolicyDefault();
- RaftLogCache(RaftPeerId selfId, RaftStorage storage, RaftProperties
properties) {
+ SegmentedRaftLogCache(RaftPeerId selfId, RaftStorage storage, RaftProperties
properties) {
this.name = selfId + "-" + getClass().getSimpleName();
this.closedSegments = new LogSegmentList(name);
this.storage = storage;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLogFormat.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogFormat.java
similarity index 98%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLogFormat.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogFormat.java
index 37b3cc6..57c6e88 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLogFormat.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogFormat.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.function.CheckedFunction;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
similarity index 90%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
index 6eb1e38..c0f9e17 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import static
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
@@ -32,8 +32,8 @@ import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LogInputStream implements Closeable {
- static final Logger LOG = LoggerFactory.getLogger(LogInputStream.class);
+public class SegmentedRaftLogInputStream implements Closeable {
+ static final Logger LOG =
LoggerFactory.getLogger(SegmentedRaftLogInputStream.class);
static class LogValidation {
private final long validLength;
@@ -64,10 +64,9 @@ public class LogInputStream implements Closeable {
private final long endIndex;
private final boolean isOpen;
private final OpenCloseState state;
- private LogReader reader;
+ private SegmentedRaftLogReader reader;
- public LogInputStream(File log, long startIndex, long endIndex,
- boolean isOpen) {
+ public SegmentedRaftLogInputStream(File log, long startIndex, long endIndex,
boolean isOpen) {
if (isOpen) {
Preconditions.assertTrue(endIndex == INVALID_LOG_INDEX);
} else {
@@ -84,7 +83,7 @@ public class LogInputStream implements Closeable {
private void init() throws IOException {
state.open();
try {
- final LogReader r = new LogReader(logFile);
+ final SegmentedRaftLogReader r = new SegmentedRaftLogReader(logFile);
if (r.verifyHeader()) {
reader = r;
}
@@ -159,7 +158,7 @@ public class LogInputStream implements Closeable {
@Override
public void close() throws IOException {
if (state.close()) {
- Optional.ofNullable(reader).ifPresent(LogReader::close);
+ Optional.ofNullable(reader).ifPresent(SegmentedRaftLogReader::close);
}
}
@@ -183,9 +182,9 @@ public class LogInputStream implements Closeable {
*/
static LogValidation scanEditLog(File file, long maxTxIdToScan)
throws IOException {
- LogInputStream in;
+ SegmentedRaftLogInputStream in;
try {
- in = new LogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX,
false);
+ in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX,
INVALID_LOG_INDEX, false);
// read the header, initialize the inputstream
in.init();
} catch (EOFException e) {
@@ -212,7 +211,7 @@ public class LogInputStream implements Closeable {
* portion beyond this index is potentially being
* updated.
*/
- static LogValidation scanEditLog(LogInputStream in, long maxIndexToScan) {
+ static LogValidation scanEditLog(SegmentedRaftLogInputStream in, long
maxIndexToScan) {
long lastPos = 0;
long end = INVALID_LOG_INDEX;
long numValid = 0;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
similarity index 95%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index 5854e01..a18aad5 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
@@ -34,8 +34,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.zip.Checksum;
-public class LogOutputStream implements Closeable {
- private static final Logger LOG =
LoggerFactory.getLogger(LogOutputStream.class);
+public class SegmentedRaftLogOutputStream implements Closeable {
+ private static final Logger LOG =
LoggerFactory.getLogger(SegmentedRaftLogOutputStream.class);
private static final ByteBuffer fill;
private static final int BUFFER_SIZE = 1024 * 1024; // 1 MB
@@ -56,7 +56,7 @@ public class LogOutputStream implements Closeable {
private final long preallocatedSize;
private long preallocatedPos;
- public LogOutputStream(File file, boolean append, long segmentMaxSize,
+ public SegmentedRaftLogOutputStream(File file, boolean append, long
segmentMaxSize,
long preallocatedSize, int bufferSize)
throws IOException {
this.file = file;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
similarity index 96%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
index b15b72f..f6203c0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.io.CorruptedFileException;
import org.apache.ratis.protocol.ChecksumException;
@@ -33,20 +33,20 @@ import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.zip.Checksum;
-public class LogReader implements Closeable {
- static final Logger LOG = LoggerFactory.getLogger(LogReader.class);
+class SegmentedRaftLogReader implements Closeable {
+ static final Logger LOG =
LoggerFactory.getLogger(SegmentedRaftLogReader.class);
/**
* InputStream wrapper that keeps track of the current stream position.
*
* This stream also allows us to set a limit on how many bytes we can read
* without getting an exception.
*/
- public static class LimitedInputStream extends FilterInputStream {
+ static class LimitedInputStream extends FilterInputStream {
private long curPos = 0;
private long markPos = -1;
private long limitPos = Long.MAX_VALUE;
- public LimitedInputStream(InputStream is) {
+ LimitedInputStream(InputStream is) {
super(is);
}
@@ -131,7 +131,7 @@ public class LogReader implements Closeable {
private byte[] temp = new byte[4096];
private final Checksum checksum;
- LogReader(File file) throws FileNotFoundException {
+ SegmentedRaftLogReader(File file) throws FileNotFoundException {
this.file = file;
this.limiter = new LimitedInputStream(
new BufferedInputStream(new FileInputStream(file)));
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
similarity index 94%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 58e5a20..a8752ec 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
@@ -26,9 +26,10 @@ import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.server.storage.RaftLogCache.SegmentFileInfo;
-import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
-import org.apache.ratis.server.storage.SegmentedRaftLog.Task;
+import org.apache.ratis.server.storage.RaftStorage;
+import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.SegmentFileInfo;
+import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog.Task;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.*;
@@ -47,8 +48,8 @@ import java.util.function.Supplier;
* This class takes the responsibility of all the raft log related I/O ops for
a
* raft peer.
*/
-class RaftLogWorker implements Runnable {
- static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class);
+class SegmentedRaftLogWorker implements Runnable {
+ static final Logger LOG =
LoggerFactory.getLogger(SegmentedRaftLogWorker.class);
static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1,
TimeUnit.SECONDS);
@@ -94,13 +95,13 @@ class RaftLogWorker implements Runnable {
private final Thread workerThread;
private final RaftStorage storage;
- private volatile LogOutputStream out;
+ private volatile SegmentedRaftLogOutputStream out;
private final Runnable submitUpdateCommitEvent;
private final StateMachine stateMachine;
private final Supplier<Timer> logFlushTimer;
/**
- * The number of entries that have been written into the LogOutputStream but
+ * The number of entries that have been written into the
SegmentedRaftLogOutputStream but
* has not been flushed.
*/
private int pendingFlushNum = 0;
@@ -117,7 +118,7 @@ class RaftLogWorker implements Runnable {
private final StateMachineDataPolicy stateMachineDataPolicy;
- RaftLogWorker(RaftPeerId selfId, StateMachine stateMachine, Runnable
submitUpdateCommitEvent,
+ SegmentedRaftLogWorker(RaftPeerId selfId, StateMachine stateMachine,
Runnable submitUpdateCommitEvent,
RaftStorage storage, RaftProperties properties) {
this.name = selfId + "-" + getClass().getSimpleName();
LOG.info("new {} for {}", name, storage);
@@ -142,7 +143,7 @@ class RaftLogWorker implements Runnable {
// Server Id can be null in unit tests
this.logFlushTimer = JavaUtils.memoize(() ->
RatisMetricsRegistry.getRegistry()
- .timer(MetricRegistry.name(RaftLogWorker.class, selfId.toString(),
"flush-time")));
+ .timer(MetricRegistry.name(SegmentedRaftLogWorker.class,
selfId.toString(), "flush-time")));
}
void start(long latestIndex, File openSegmentFile) throws IOException {
@@ -151,7 +152,7 @@ class RaftLogWorker implements Runnable {
flushedIndex = latestIndex;
if (openSegmentFile != null) {
Preconditions.assertTrue(openSegmentFile.exists());
- out = new LogOutputStream(openSegmentFile, true, segmentMaxSize,
+ out = new SegmentedRaftLogOutputStream(openSegmentFile, true,
segmentMaxSize,
preallocatedSize, bufferSize);
}
workerThread.start();
@@ -197,7 +198,7 @@ class RaftLogWorker implements Runnable {
} catch (Throwable t) {
if (t instanceof InterruptedException && !running) {
LOG.info("Got InterruptedException when adding task " + task
- + ". The RaftLogWorker already stopped.");
+ + ". The SegmentedRaftLogWorker already stopped.");
} else {
ExitUtils.terminate(2, "Failed to add IO task " + task, t, LOG);
}
@@ -462,7 +463,7 @@ class RaftLogWorker implements Runnable {
Preconditions.assertTrue(!openFile.exists(), "open file %s exists for
%s",
openFile, name);
Preconditions.assertTrue(out == null && pendingFlushNum == 0);
- out = new LogOutputStream(openFile, false, segmentMaxSize,
+ out = new SegmentedRaftLogOutputStream(openFile, false, segmentMaxSize,
preallocatedSize, bufferSize);
Preconditions.assertTrue(openFile.exists(), "Failed to create file %s
for %s",
openFile.getAbsolutePath(), name);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java
index 0e4ccbd..329f174 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java
@@ -31,7 +31,7 @@ import java.util.Properties;
* a single <code>long</code> value. The file is updated atomically
* and durably (i.e fsynced).
*/
-class MetaFile {
+public class MetaFile {
private static final Logger LOG = LoggerFactory.getLogger(MetaFile.class);
private static final String TERM_KEY = "term";
private static final String VOTEDFOR_KEY = "votedFor";
@@ -53,7 +53,7 @@ class MetaFile {
return this.file.exists();
}
- long getTerm() throws IOException {
+ public long getTerm() throws IOException {
if (!loaded) {
readFile();
loaded = true;
@@ -61,7 +61,7 @@ class MetaFile {
return term;
}
- String getVotedFor() throws IOException {
+ public String getVotedFor() throws IOException {
if (!loaded) {
readFile();
loaded = true;
@@ -69,7 +69,7 @@ class MetaFile {
return votedFor;
}
- void set(long newTerm, String newVotedFor) throws IOException {
+ public void set(long newTerm, String newVotedFor) throws IOException {
newVotedFor = newVotedFor == null ? EMPTY_VOTEFOR : newVotedFor;
if (!loaded || (newTerm != term || !newVotedFor.equals(votedFor))) {
writeFile(newTerm, newVotedFor);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index bcb2b6e..35fcaaf 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -116,7 +116,7 @@ public class RaftStorage implements Closeable {
storageDir.unlock();
}
- MetaFile getMetaFile() {
+ public MetaFile getMetaFile() {
return metaFile;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
index 5127e74..9cc7db0 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -153,7 +153,7 @@ public class RaftStorageDirectory {
return new File(getCurrentDir(), META_FILE_NAME + CONF_EXTENSION);
}
- File getOpenLogFile(long startIndex) {
+ public File getOpenLogFile(long startIndex) {
return new File(getCurrentDir(), getOpenLogFileName(startIndex));
}
@@ -161,7 +161,7 @@ public class RaftStorageDirectory {
return LOG_FILE_PREFIX + "_" + LOG_FILE_INPROGRESS + "_" + startIndex;
}
- File getClosedLogFile(long startIndex, long endIndex) {
+ public File getClosedLogFile(long startIndex, long endIndex) {
return new File(getCurrentDir(), getClosedLogFileName(startIndex,
endIndex));
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index c8ddc0d..6d92bbe 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -28,7 +28,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.LogAppender;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.impl.ServerState;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.LogUtils;
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index de7dc0e..22888ec 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -37,8 +37,8 @@ import
org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.impl.RaftServerTestUtil;
-import org.apache.ratis.server.storage.MemoryRaftLog;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.memory.MemoryRaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.CollectionUtils;
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 3478d9f..c01fdc2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -30,7 +30,7 @@ import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 35747a6..0cc9265 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -24,8 +24,8 @@ import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.server.storage.RaftLogIOException;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.SizeInBytes;
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 93af79d..5b26419 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -33,7 +33,7 @@ import
org.apache.ratis.server.impl.DelayLocalExecutionInjection;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.CollectionUtils;
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 60f62cd..20656b2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -29,8 +29,8 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
-import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.server.storage.RaftLogIOException;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.TimeDuration;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 3b18ade..9cfb436 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -33,10 +33,10 @@ import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.impl.ServerState;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.server.storage.RaftLogIOException;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogFormat;
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
-import org.apache.ratis.server.storage.SegmentedRaftLogFormat;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.FileUtils;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index f31620f..abcd403 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -23,7 +23,7 @@ import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.storage.RaftStorageTestUtils;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
@@ -63,7 +63,7 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
@Test
public void testChangeLeader() throws Exception {
- RaftStorageTestUtils.setRaftLogWorkerLogLevel(Level.TRACE);
+ SegmentedRaftLogTestUtils.setRaftLogWorkerLogLevel(Level.TRACE);
LOG.info("Running testChangeLeader");
final MiniRaftCluster cluster = newCluster(3);
cluster.start();
@@ -73,7 +73,7 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
leader = RaftTestUtil.changeLeader(cluster, leader,
IllegalStateException::new);
ExitUtils.assertNotTerminated();
}
- RaftStorageTestUtils.setRaftLogWorkerLogLevel(Level.INFO);
+ SegmentedRaftLogTestUtils.setRaftLogWorkerLogLevel(Level.INFO);
cluster.shutdown();
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index f96ceff..d937950 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -28,7 +28,7 @@ import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorageTestUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index 8fd60b7..ed3929f 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -26,7 +26,7 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
similarity index 63%
rename from
ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
rename to
ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
index 5b16b13..52931d4 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,23 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
-import org.apache.ratis.protocol.RaftException;
+import org.apache.log4j.Level;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.LogUtils;
-/**
- * Exception while reading/writing RaftLog
- */
-public class RaftLogIOException extends RaftException {
- public RaftLogIOException(Throwable cause) {
- super(cause);
- }
-
- public RaftLogIOException(String msg) {
- super(msg);
+public interface SegmentedRaftLogTestUtils {
+ static void setRaftLogWorkerLogLevel(Level level) {
+ LogUtils.setLogLevel(SegmentedRaftLogWorker.LOG, level);
}
- public RaftLogIOException(String message, Throwable cause) {
- super(message, cause);
+ static String getLogFlushTimeMetric(RaftPeerId serverId) {
+ return SegmentedRaftLogWorker.class.getName() + "." + serverId +
".flush-time";
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index e681b66..29fe962 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,25 +17,15 @@
*/
package org.apache.ratis.server.storage;
-import org.apache.log4j.Level;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.util.AutoCloseableLock;
-import org.apache.ratis.util.LogUtils;
import java.util.function.Consumer;
public interface RaftStorageTestUtils {
- static void setRaftLogWorkerLogLevel(Level level) {
- LogUtils.setLogLevel(RaftLogWorker.LOG, level);
- }
-
- static String getLogFlushTimeMetric(RaftPeerId serverId) {
- return RaftLogWorker.class.getName() + "." + serverId + ".flush-time";
- }
-
static void printLog(RaftLog log, Consumer<String> println) {
if (log == null) {
println.accept("log == null");
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 795c7b2..4f30a50 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -29,7 +29,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 5e5c9c6..0b6491e 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -33,8 +33,8 @@ import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.LogInputStream;
-import org.apache.ratis.server.storage.LogOutputStream;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogInputStream;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogOutputStream;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
@@ -248,7 +248,7 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
termIndex.getIndex());
LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}",
termIndex.getTerm(),
termIndex.getIndex(), snapshotFile);
- try (LogOutputStream out = new LogOutputStream(snapshotFile, false,
+ try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(snapshotFile, false,
segmentMaxSize, preallocatedSize, bufferSize)) {
for (final LogEntryProto entry : indexMap.values()) {
if (entry.getIndex() > endIndex) {
@@ -294,7 +294,7 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
} else {
LOG.info("Loading snapshot {}", snapshot);
final long endIndex = snapshot.getIndex();
- try (LogInputStream in = new LogInputStream(
+ try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(
snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
LogEntryProto entry;
while ((entry = in.nextEntry()) != null) {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
index 7f75fff..8dc94d9 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
@@ -28,7 +28,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index 8886485..408f4be 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -26,7 +26,7 @@ import org.apache.ratis.grpc.client.GrpcOutputStream;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.SizeInBytes;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 7026b33..0568581 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -25,7 +25,7 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.JavaUtils;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogIndex.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogIndex.java
similarity index 98%
rename from
ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogIndex.java
rename to
ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogIndex.java
index 8636b5a..ac65522 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogIndex.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogIndex.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog;
import org.apache.ratis.BaseTest;
import org.junit.Assert;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
similarity index 92%
rename from
ratis-test/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
rename to
ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
index 58e319d..81ad927 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,8 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.ratis.server;
+package org.apache.ratis.server.raftlog;
import com.codahale.metrics.Timer;
import org.apache.log4j.Level;
@@ -27,7 +26,7 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.metrics.RatisMetricsRegistry;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
-import org.apache.ratis.server.storage.RaftStorageTestUtils;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.JavaUtils;
@@ -95,12 +94,12 @@ public class TestRaftLogMetrics extends BaseTest
// For followers, flush can be lagged behind. Attempt multiple times.
for(RaftServerImpl f : cluster.getFollowers()) {
- JavaUtils.attempt(() -> assertFlushCount(f), 10, 100, f.getId() +
"-assertFlushCount", null);
+ JavaUtils.attempt(() -> assertFlushCount(f), 10, HUNDRED_MILLIS,
f.getId() + "-assertFlushCount", null);
}
}
static void assertFlushCount(RaftServerImpl server) throws Exception {
- final String flushTimeMetric =
RaftStorageTestUtils.getLogFlushTimeMetric(server.getId());
+ final String flushTimeMetric =
SegmentedRaftLogTestUtils.getLogFlushTimeMetric(server.getId());
Timer tm =
RatisMetricsRegistry.getRegistry().getTimers().get(flushTimeMetric);
Assert.assertNotNull(tm);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
similarity index 95%
rename from
ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
rename to
ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index d84de42..8595246 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
@@ -28,9 +28,10 @@ import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.impl.ServerState;
-import
org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
-import org.apache.ratis.server.storage.RaftLogCache.LogSegmentList;
-import org.apache.ratis.server.storage.TestSegmentedRaftLog.SegmentRange;
+import
org.apache.ratis.server.raftlog.segmented.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
+import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.LogSegmentList;
+import
org.apache.ratis.server.raftlog.segmented.TestSegmentedRaftLog.SegmentRange;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.SizeInBytes;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
similarity index 93%
rename from
ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
rename to
ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 270e279..5ab6a6c 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
@@ -23,7 +23,9 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
+import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecordWithEntry;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
@@ -47,19 +49,19 @@ import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import static
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-import static org.apache.ratis.server.storage.LogSegment.getEntrySize;
+import static
org.apache.ratis.server.raftlog.segmented.LogSegment.getEntrySize;
/**
* Test basic functionality of {@link LogSegment}
*/
-public class TestRaftLogSegment extends BaseTest {
+public class TestLogSegment extends BaseTest {
private File storageDir;
private long segmentMaxSize;
private long preallocatedSize;
private int bufferSize;
@Before
- public void setup() throws Exception {
+ public void setup() {
RaftProperties properties = new RaftProperties();
storageDir = getTestDir();
RaftServerConfigKeys.setStorageDirs(properties,
Collections.singletonList(storageDir));
@@ -89,7 +91,7 @@ public class TestRaftLogSegment extends BaseTest {
storage.getStorageDir().getClosedLogFile(startIndex, startIndex +
numEntries - 1);
final LogEntryProto[] entries = new LogEntryProto[numEntries];
- try (LogOutputStream out = new LogOutputStream(file, false,
+ try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(file, false,
segmentMaxSize, preallocatedSize, bufferSize)) {
for (int i = 0; i < entries.length; i++) {
SimpleOperation op = new SimpleOperation("m" + i);
@@ -283,12 +285,10 @@ public class TestRaftLogSegment extends BaseTest {
// make sure preallocation is correct with different max/pre-allocated size
for (int max : maxSizes) {
for (int a : preallocated) {
- try (LogOutputStream ignored =
- new LogOutputStream(file, false, max, a, bufferSize)) {
+ try(SegmentedRaftLogOutputStream ignored = new
SegmentedRaftLogOutputStream(file, false, max, a, bufferSize)) {
Assert.assertEquals("max=" + max + ", a=" + a, file.length(),
Math.min(max, a));
}
- try (LogInputStream in =
- new LogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
+ try(SegmentedRaftLogInputStream in = new
SegmentedRaftLogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
LogEntryProto entry = in.nextEntry();
Assert.assertNull(entry);
}
@@ -299,7 +299,7 @@ public class TestRaftLogSegment extends BaseTest {
final byte[] content = new byte[1024 * 2];
Arrays.fill(content, (byte) 1);
final long size;
- try (LogOutputStream out = new LogOutputStream(file, false,
+ try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(file, false,
1024, 1024, bufferSize)) {
SimpleOperation op = new SimpleOperation(new String(content));
LogEntryProto entry =
ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
@@ -308,7 +308,7 @@ public class TestRaftLogSegment extends BaseTest {
}
Assert.assertEquals(file.length(),
size + SegmentedRaftLogFormat.getHeaderLength());
- try (LogInputStream in = new LogInputStream(file, 0,
+ try (SegmentedRaftLogInputStream in = new
SegmentedRaftLogInputStream(file, 0,
INVALID_LOG_INDEX, true)) {
LogEntryProto entry = in.nextEntry();
Assert.assertArrayEquals(content,
@@ -334,7 +334,7 @@ public class TestRaftLogSegment extends BaseTest {
long totalSize = SegmentedRaftLogFormat.getHeaderLength();
long preallocated = 16 * 1024;
- try (LogOutputStream out = new LogOutputStream(file, false,
+ try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(file, false,
max.getSize(), 16 * 1024, 10 * 1024)) {
Assert.assertEquals(preallocated, file.length());
while (totalSize + entrySize < max.getSize()) {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
similarity index 87%
rename from
ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
rename to
ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
index 7d9fdf5..7edf1c0 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
@@ -25,6 +25,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.FileUtils;
@@ -43,7 +44,7 @@ import java.util.Collections;
import java.util.List;
/**
- * Test basic functionality of LogReader, LogInputStream, and LogOutputStream.
+ * Test basic functionality of LogReader, SegmentedRaftLogInputStream, and
SegmentedRaftLogOutputStream.
*/
public class TestRaftLogReadWrite extends BaseTest {
private File storageDir;
@@ -74,8 +75,7 @@ public class TestRaftLogReadWrite extends BaseTest {
private LogEntryProto[] readLog(File file, long startIndex, long endIndex,
boolean isOpen) throws IOException {
List<LogEntryProto> list = new ArrayList<>();
- try (LogInputStream in =
- new LogInputStream(file, startIndex, endIndex, isOpen)) {
+ try (SegmentedRaftLogInputStream in = new
SegmentedRaftLogInputStream(file, startIndex, endIndex, isOpen)) {
LogEntryProto entry;
while ((entry = in.nextEntry()) != null) {
list.add(entry);
@@ -84,7 +84,7 @@ public class TestRaftLogReadWrite extends BaseTest {
return list.toArray(new LogEntryProto[list.size()]);
}
- private long writeMessages(LogEntryProto[] entries, LogOutputStream out)
+ private long writeMessages(LogEntryProto[] entries,
SegmentedRaftLogOutputStream out)
throws IOException {
long size = 0;
for (int i = 0; i < entries.length; i++) {
@@ -107,9 +107,8 @@ public class TestRaftLogReadWrite extends BaseTest {
long size = SegmentedRaftLogFormat.getHeaderLength();
final LogEntryProto[] entries = new LogEntryProto[100];
- try (LogOutputStream out =
- new LogOutputStream(openSegment, false, segmentMaxSize,
- preallocatedSize, bufferSize)) {
+ try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(openSegment, false,
+ segmentMaxSize, preallocatedSize, bufferSize)) {
size += writeMessages(entries, out);
} finally {
storage.close();
@@ -127,9 +126,8 @@ public class TestRaftLogReadWrite extends BaseTest {
final RaftStorage storage = new RaftStorage(storageDir,
StartupOption.REGULAR);
File openSegment = storage.getStorageDir().getOpenLogFile(0);
LogEntryProto[] entries = new LogEntryProto[200];
- try (LogOutputStream out =
- new LogOutputStream(openSegment, false, segmentMaxSize,
- preallocatedSize, bufferSize)) {
+ try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(openSegment, false,
+ segmentMaxSize, preallocatedSize, bufferSize)) {
for (int i = 0; i < 100; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(),
0, i);
@@ -137,9 +135,8 @@ public class TestRaftLogReadWrite extends BaseTest {
}
}
- try (LogOutputStream out =
- new LogOutputStream(openSegment, true, segmentMaxSize,
- preallocatedSize, bufferSize)) {
+ try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(openSegment, true,
+ segmentMaxSize, preallocatedSize, bufferSize)) {
for (int i = 100; i < 200; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(),
0, i);
@@ -165,7 +162,7 @@ public class TestRaftLogReadWrite extends BaseTest {
long size = SegmentedRaftLogFormat.getHeaderLength();
LogEntryProto[] entries = new LogEntryProto[100];
- LogOutputStream out = new LogOutputStream(openSegment, false,
+ final SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(openSegment, false,
segmentMaxSize, preallocatedSize, bufferSize);
size += writeMessages(entries, out);
out.flush();
@@ -194,7 +191,7 @@ public class TestRaftLogReadWrite extends BaseTest {
File openSegment = storage.getStorageDir().getOpenLogFile(0);
LogEntryProto[] entries = new LogEntryProto[10];
- LogOutputStream out = new LogOutputStream(openSegment, false,
+ final SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(openSegment, false,
16 * 1024 * 1024, 4 * 1024 * 1024, bufferSize);
for (int i = 0; i < 10; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
@@ -213,7 +210,7 @@ public class TestRaftLogReadWrite extends BaseTest {
}
List<LogEntryProto> list = new ArrayList<>();
- try (LogInputStream in = new LogInputStream(openSegment, 0,
+ try (SegmentedRaftLogInputStream in = new
SegmentedRaftLogInputStream(openSegment, 0,
RaftServerConstants.INVALID_LOG_INDEX, true)) {
LogEntryProto entry;
while ((entry = in.nextEntry()) != null) {
@@ -241,9 +238,8 @@ public class TestRaftLogReadWrite extends BaseTest {
public void testReadWithEntryCorruption() throws IOException {
RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
File openSegment = storage.getStorageDir().getOpenLogFile(0);
- try (LogOutputStream out =
- new LogOutputStream(openSegment, false, segmentMaxSize,
- preallocatedSize, bufferSize)) {
+ try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(openSegment, false,
+ segmentMaxSize, preallocatedSize, bufferSize)) {
for (int i = 0; i < 100; i++) {
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(
new SimpleOperation("m" + i).getLogEntryContent(), 0, i);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
similarity index 96%
rename from
ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
rename to
ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 8007f4d..0b2700a 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
@@ -31,6 +31,8 @@ import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -64,8 +66,8 @@ import static org.mockito.Mockito.when;
public class TestSegmentedRaftLog extends BaseTest {
static {
- LogUtils.setLogLevel(RaftLogWorker.LOG, Level.DEBUG);
- LogUtils.setLogLevel(RaftLogCache.LOG, Level.TRACE);
+ LogUtils.setLogLevel(SegmentedRaftLogWorker.LOG, Level.DEBUG);
+ LogUtils.setLogLevel(SegmentedRaftLogCache.LOG, Level.TRACE);
LogUtils.setLogLevel(SegmentedRaftLog.LOG, Level.TRACE);
}
@@ -122,7 +124,7 @@ public class TestSegmentedRaftLog extends BaseTest {
final int size = (int) (range.end - range.start + 1);
LogEntryProto[] entries = new LogEntryProto[size];
- try (LogOutputStream out = new LogOutputStream(file, false,
+ try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(file, false,
segmentMaxSize, preallocatedSize, bufferSize)) {
for (int i = 0; i < size; i++) {
SimpleOperation m = new SimpleOperation("m" + (i + range.start));
@@ -451,7 +453,7 @@ public class TestSegmentedRaftLog extends BaseTest {
Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
raftLog.getLatestFlushedIndex());
- RaftLogCache cache = raftLog.getRaftLogCache();
+ SegmentedRaftLogCache cache = raftLog.getRaftLogCache();
Assert.assertEquals(5, cache.getNumOfSegments());
}
}
@@ -512,13 +514,13 @@ public class TestSegmentedRaftLog extends BaseTest {
try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm,
null, storage, -1, properties)) {
raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
- raftLog.appendEntry(entry); // RaftLogWorker should catch
TimeoutIOException
+ raftLog.appendEntry(entry); // SegmentedRaftLogWorker should catch
TimeoutIOException
JavaUtils.attempt(() -> {
final ExitUtils.ExitException exitException =
ExitUtils.getFirstExitException();
Objects.requireNonNull(exitException, "exitException == null");
Assert.assertEquals(TimeoutIOException.class,
exitException.getCause().getClass());
- }, 3*numRetries, syncTimeout, "RaftLogWorker should catch
TimeoutIOException and exit", LOG);
+ }, 3*numRetries, syncTimeout, "SegmentedRaftLogWorker should catch
TimeoutIOException and exit", LOG);
ExitUtils.clear();
}
}
@@ -538,7 +540,7 @@ public class TestSegmentedRaftLog extends BaseTest {
void assertIndicesMultipleAttempts(RaftLog raftLog, long expectedFlushIndex,
long expectedNextIndex) throws Exception {
JavaUtils.attempt(() -> assertIndices(raftLog, expectedFlushIndex,
expectedNextIndex),
- 10, 100, "assertIndices", LOG);
+ 10, HUNDRED_MILLIS, "assertIndices", LOG);
}
@Test
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
similarity index 97%
rename from
ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
rename to
ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index a1157e9..0ce0821 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,31 +15,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.raftlog.segmented;
import java.io.IOException;
import java.util.Iterator;
-import java.util.function.IntConsumer;
import java.util.stream.IntStream;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
+import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-public class TestRaftLogCache {
+public class TestSegmentedRaftLogCache {
private static final RaftProperties prop = new RaftProperties();
- private RaftLogCache cache;
+ private SegmentedRaftLogCache cache;
@Before
public void setup() {
- cache = new RaftLogCache(null, null, prop);
+ cache = new SegmentedRaftLogCache(null, null, prop);
}
private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {