This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f7a394c  RATIS-587. FinalizeLogSegment should flush the contents of 
log.
f7a394c is described below

commit f7a394c9786e27b8aca9a9eedde967b8d4620643
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jun 17 11:21:45 2019 -0700

    RATIS-587. FinalizeLogSegment should flush the contents of log.
---
 .../raftlog/segmented/BufferedWriteChannel.java    | 78 +++++-----------------
 .../segmented/SegmentedRaftLogOutputStream.java    |  9 +--
 2 files changed, 20 insertions(+), 67 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index 3b302e7..86b45b5 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -20,58 +20,28 @@ package org.apache.ratis.server.raftlog.segmented;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Provides a buffering layer in front of a FileChannel for writing.
+ *
+ * This class is NOT threadsafe.
  */
 public class BufferedWriteChannel extends BufferedChannelBase {
-  // The capacity of the write buffer.
-  private final int writeCapacity;
-  // The position of the file channel's write pointer.
-  private AtomicLong writeBufferStartPosition = new AtomicLong(0);
   // The buffer used to write operations.
   private final ByteBuffer writeBuffer;
   // The absolute position of the next write operation.
   private volatile long position;
+  /** Are all the data already flushed? */
+  private boolean flushed = true;
 
   public BufferedWriteChannel(FileChannel fc, int writeCapacity)
       throws IOException {
     super(fc);
-    this.writeCapacity = writeCapacity;
     this.position = fc.position();
-    this.writeBufferStartPosition.set(position);
     this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity);
   }
 
   /**
-   * Write all the data in src to the {@link FileChannel}. Note that this 
function can
-   * buffer or re-order writes based on the implementation. These writes will 
be flushed
-   * to the disk only when flush() is invoked.
-   *
-   * @param src The source ByteBuffer which contains the data to be written.
-   * @throws IOException if a write operation fails.
-   */
-  public void write(ByteBuffer src) throws IOException {
-    int copied = 0;
-    while (src.remaining() > 0) {
-      int truncated = 0;
-      if (writeBuffer.remaining() < src.remaining()) {
-        truncated = src.remaining() - writeBuffer.remaining();
-        src.limit(src.limit() - truncated);
-      }
-      copied += src.remaining();
-      writeBuffer.put(src);
-      src.limit(src.limit() + truncated);
-      // if we have run out of buffer space, we should flush to the file
-      if (writeBuffer.remaining() == 0) {
-        flushInternal();
-      }
-    }
-    position += copied;
-  }
-
-  /**
    * Write the specified byte.
    * @param b the byte to be written
    */
@@ -80,6 +50,7 @@ public class BufferedWriteChannel extends BufferedChannelBase 
{
     if (writeBuffer.remaining() == 0) {
       flushInternal();
     }
+    flushed = false;
     position++;
   }
 
@@ -93,6 +64,7 @@ public class BufferedWriteChannel extends BufferedChannelBase 
{
         flushInternal();
       }
     }
+    flushed = false;
     position += b.length;
   }
 
@@ -104,26 +76,19 @@ public class BufferedWriteChannel extends 
BufferedChannelBase {
   }
 
   /**
-   * Get the position of the file channel's write pointer.
-   */
-  public long getFileChannelPosition() {
-    return writeBufferStartPosition.get();
-  }
-
-
-  /**
-   * Write any data in the buffer to the file. If sync is set to true, force a
+   * Write any data in the buffer to the file and force a
    * sync operation so that data is persisted to the disk.
    *
    * @throws IOException if the write or sync operation fails.
    */
-  public void flush(boolean shouldForceWrite) throws IOException {
-    synchronized (this) {
-      flushInternal();
-    }
-    if (shouldForceWrite) {
-      forceWrite(false);
+  void flush() throws IOException {
+    if (flushed) {
+      return; // flushed previously
     }
+
+    flushInternal();
+    fileChannel.force(false);
+    flushed = true;
   }
 
   /**
@@ -133,22 +98,15 @@ public class BufferedWriteChannel extends 
BufferedChannelBase {
    * @throws IOException if the write fails.
    */
   private void flushInternal() throws IOException {
+    if (writeBuffer.position() == 0) {
+      return; // nothing to flush
+    }
+
     writeBuffer.flip();
     do {
       fileChannel.write(writeBuffer);
     } while (writeBuffer.hasRemaining());
     writeBuffer.clear();
-    writeBufferStartPosition.set(fileChannel.position());
-  }
-
-  public long forceWrite(boolean forceMetadata) throws IOException {
-    // This is the point up to which we had flushed to the file system page 
cache
-    // before issuing this force write hence is guaranteed to be made durable 
by
-    // the force write, any flush that happens after this may or may
-    // not be flushed
-    long positionForceWrite = writeBufferStartPosition.get();
-    fileChannel.force(forceMetadata);
-    return positionForceWrite;
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index a18aad5..d042f33 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -64,11 +64,6 @@ public class SegmentedRaftLogOutputStream implements 
Closeable {
     this.segmentMaxSize = segmentMaxSize;
     this.preallocatedSize = preallocatedSize;
     RandomAccessFile rp = new RandomAccessFile(file, "rw");
-    fc = rp.getChannel();
-    fc.position(fc.size());
-    preallocatedPos = fc.size();
-    out = new BufferedWriteChannel(fc, bufferSize);
-
     try {
       fc = rp.getChannel();
       fc.position(fc.size());
@@ -143,7 +138,7 @@ public class SegmentedRaftLogOutputStream implements 
Closeable {
   @Override
   public void close() throws IOException {
     try {
-      out.flush(false);
+      out.flush();
       if (fc != null && fc.isOpen()) {
         fc.truncate(fc.position());
       }
@@ -162,7 +157,7 @@ public class SegmentedRaftLogOutputStream implements 
Closeable {
     if (out == null) {
       throw new IOException("Trying to use aborted output stream");
     }
-    out.flush(true);
+    out.flush();
   }
 
   private void preallocate() throws IOException {

Reply via email to