This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f7a394c RATIS-587. FinalizeLogSegment should flush the contents of
log.
f7a394c is described below
commit f7a394c9786e27b8aca9a9eedde967b8d4620643
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jun 17 11:21:45 2019 -0700
RATIS-587. FinalizeLogSegment should flush the contents of log.
---
.../raftlog/segmented/BufferedWriteChannel.java | 78 +++++-----------------
.../segmented/SegmentedRaftLogOutputStream.java | 9 +--
2 files changed, 20 insertions(+), 67 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index 3b302e7..86b45b5 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -20,58 +20,28 @@ package org.apache.ratis.server.raftlog.segmented;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicLong;
/**
* Provides a buffering layer in front of a FileChannel for writing.
+ *
+ * This class is NOT threadsafe.
*/
public class BufferedWriteChannel extends BufferedChannelBase {
- // The capacity of the write buffer.
- private final int writeCapacity;
- // The position of the file channel's write pointer.
- private AtomicLong writeBufferStartPosition = new AtomicLong(0);
// The buffer used to write operations.
private final ByteBuffer writeBuffer;
// The absolute position of the next write operation.
private volatile long position;
+ /** Are all the data already flushed? */
+ private boolean flushed = true;
public BufferedWriteChannel(FileChannel fc, int writeCapacity)
throws IOException {
super(fc);
- this.writeCapacity = writeCapacity;
this.position = fc.position();
- this.writeBufferStartPosition.set(position);
this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity);
}
/**
- * Write all the data in src to the {@link FileChannel}. Note that this
function can
- * buffer or re-order writes based on the implementation. These writes will
be flushed
- * to the disk only when flush() is invoked.
- *
- * @param src The source ByteBuffer which contains the data to be written.
- * @throws IOException if a write operation fails.
- */
- public void write(ByteBuffer src) throws IOException {
- int copied = 0;
- while (src.remaining() > 0) {
- int truncated = 0;
- if (writeBuffer.remaining() < src.remaining()) {
- truncated = src.remaining() - writeBuffer.remaining();
- src.limit(src.limit() - truncated);
- }
- copied += src.remaining();
- writeBuffer.put(src);
- src.limit(src.limit() + truncated);
- // if we have run out of buffer space, we should flush to the file
- if (writeBuffer.remaining() == 0) {
- flushInternal();
- }
- }
- position += copied;
- }
-
- /**
* Write the specified byte.
* @param b the byte to be written
*/
@@ -80,6 +50,7 @@ public class BufferedWriteChannel extends BufferedChannelBase
{
if (writeBuffer.remaining() == 0) {
flushInternal();
}
+ flushed = false;
position++;
}
@@ -93,6 +64,7 @@ public class BufferedWriteChannel extends BufferedChannelBase
{
flushInternal();
}
}
+ flushed = false;
position += b.length;
}
@@ -104,26 +76,19 @@ public class BufferedWriteChannel extends
BufferedChannelBase {
}
/**
- * Get the position of the file channel's write pointer.
- */
- public long getFileChannelPosition() {
- return writeBufferStartPosition.get();
- }
-
-
- /**
- * Write any data in the buffer to the file. If sync is set to true, force a
+ * Write any data in the buffer to the file and force a
* sync operation so that data is persisted to the disk.
*
* @throws IOException if the write or sync operation fails.
*/
- public void flush(boolean shouldForceWrite) throws IOException {
- synchronized (this) {
- flushInternal();
- }
- if (shouldForceWrite) {
- forceWrite(false);
+ void flush() throws IOException {
+ if (flushed) {
+ return; // flushed previously
}
+
+ flushInternal();
+ fileChannel.force(false);
+ flushed = true;
}
/**
@@ -133,22 +98,15 @@ public class BufferedWriteChannel extends
BufferedChannelBase {
* @throws IOException if the write fails.
*/
private void flushInternal() throws IOException {
+ if (writeBuffer.position() == 0) {
+ return; // nothing to flush
+ }
+
writeBuffer.flip();
do {
fileChannel.write(writeBuffer);
} while (writeBuffer.hasRemaining());
writeBuffer.clear();
- writeBufferStartPosition.set(fileChannel.position());
- }
-
- public long forceWrite(boolean forceMetadata) throws IOException {
- // This is the point up to which we had flushed to the file system page
cache
- // before issuing this force write hence is guaranteed to be made durable
by
- // the force write, any flush that happens after this may or may
- // not be flushed
- long positionForceWrite = writeBufferStartPosition.get();
- fileChannel.force(forceMetadata);
- return positionForceWrite;
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index a18aad5..d042f33 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -64,11 +64,6 @@ public class SegmentedRaftLogOutputStream implements
Closeable {
this.segmentMaxSize = segmentMaxSize;
this.preallocatedSize = preallocatedSize;
RandomAccessFile rp = new RandomAccessFile(file, "rw");
- fc = rp.getChannel();
- fc.position(fc.size());
- preallocatedPos = fc.size();
- out = new BufferedWriteChannel(fc, bufferSize);
-
try {
fc = rp.getChannel();
fc.position(fc.size());
@@ -143,7 +138,7 @@ public class SegmentedRaftLogOutputStream implements
Closeable {
@Override
public void close() throws IOException {
try {
- out.flush(false);
+ out.flush();
if (fc != null && fc.isOpen()) {
fc.truncate(fc.position());
}
@@ -162,7 +157,7 @@ public class SegmentedRaftLogOutputStream implements
Closeable {
if (out == null) {
throw new IOException("Trying to use aborted output stream");
}
- out.flush(true);
+ out.flush();
}
private void preallocate() throws IOException {