jacek-lewandowski commented on code in PR #2894:
URL: https://github.com/apache/cassandra/pull/2894#discussion_r1404236037


##########
src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import com.sun.nio.file.ExtendedOpenOption;
+import net.openhft.chronicle.core.util.ThrowingFunction;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SimpleCachedBufferPool;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import sun.nio.ch.DirectBuffer;
+
+/*
+ * Direct-IO segment. Allocates ByteBuffer using ByteBuffer.allocateDirect and 
align
+ * ByteBuffer.position, ByteBuffer.limit and FileChannel.position to page size 
(4K).
+ * Java-11 forces minimum page size to be written to disk with Direct-IO.
+ */
+public class DirectIOSegment extends CommitLogSegment
+{
+    private final int fsBlockSize;
+    private final int fsBlockQuotientMask;
+    private final int fsBlockRemainderMask;
+
+    // Needed to track number of bytes written to disk in multiple of page 
size.
+    long lastWritten = 0;
+
+    /**
+     * Constructs a new segment file.
+     */
+    DirectIOSegment(AbstractCommitLogSegmentManager manager, 
ThrowingFunction<Path, FileChannel, IOException> channelFactory, int 
fsBlockSize)
+    {
+        super(manager, channelFactory);
+
+        assert Integer.highestOneBit(fsBlockSize) == fsBlockSize : 
"fsBlockSize must be a power of 2";
+
+        // mark the initial sync marker as uninitialised
+        int firstSync = buffer.position();
+        buffer.putInt(firstSync + 0, 0);
+        buffer.putInt(firstSync + 4, 0);
+
+        this.fsBlockSize = fsBlockSize;
+        this.fsBlockRemainderMask = fsBlockSize - 1;
+        this.fsBlockQuotientMask = ~fsBlockRemainderMask;
+    }
+
+    @Override
+    void writeLogHeader()
+    {
+        super.writeLogHeader();
+        // Testing shows writing initial bytes takes some time for Direct I/O. 
During peak load,
+        // it is better to make "COMMIT-LOG-ALLOCATOR" thread to write these 
few bytes of each
+        // file and this helps syncer thread to speedup the flush activity.
+        flush(0, lastSyncedOffset);
+    }
+
+    @Override
+    void write(int startMarker, int nextMarker)
+    {
+        // if there's room in the discard section to write an empty header,
+        // zero out the next sync marker so replayer can cleanly exit
+        if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE)
+        {
+            buffer.putInt(nextMarker, 0);
+            buffer.putInt(nextMarker + 4, 0);
+        }
+
+        // write previous sync marker to point to next sync marker
+        // we don't chain the crcs here to ensure this method is idempotent if 
it fails
+        writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+    }
+
+    @Override
+    protected void flush(int startMarker, int nextMarker)
+    {
+        try
+        {
+            // lastSyncedOffset is synced to disk. Align lastSyncedOffset to 
start of its block
+            // and nextMarker to end of its block to avoid write errors.
+            int flushPosition = lastSyncedOffset;
+            ByteBuffer duplicate = buffer.duplicate();
+
+            // Aligned file position if not aligned to start of a block.
+            if ((flushPosition & fsBlockRemainderMask) != 0)
+            {
+                flushPosition = flushPosition & fsBlockQuotientMask;
+                channel.position(flushPosition);
+            }
+            duplicate.position(flushPosition);
+
+            int flushLimit = nextMarker;
+
+            // Align last byte to end of block.
+            if ((flushLimit & fsBlockRemainderMask) != 0)
+                flushLimit = (flushLimit + fsBlockSize) & fsBlockQuotientMask;

Review Comment:
   I'll use the first one; I'm not sure if the second one would work without 
the "if" - it looks like it would unnecessarily move forward the flush limit if 
it is already aligned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to