Maxwell-Guo commented on code in PR #2894:
URL: https://github.com/apache/cassandra/pull/2894#discussion_r1403843903
##########
src/java/org/apache/cassandra/db/commitlog/CommitLog.java:
##########
@@ -684,5 +694,23 @@ public EncryptionContext getEncryptionContext()
{
return encryptionContext;
}
+
+ /**
+ * Returns Direct-IO/non-buffer used for CommitLog IO.
Review Comment:
I think we can delete this line of description as repeated with the line
below
##########
src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java:
##########
@@ -20,30 +20,37 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.*;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.zip.CRC32;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.FileWriter;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
Review Comment:
Seems the import order is wrong :
1.`import org.cliffc.high_scale_lib.NonBlockingHashMap; `
should be after
`import org.apache.cassandra.utils.concurrent.WaitQueue;`
2.`import com.codahale.metrics.Timer;`
should be before
` import com.google.common.annotations.VisibleForTesting;`
##########
src/java/org/apache/cassandra/db/commitlog/CommitLog.java:
##########
@@ -684,5 +694,23 @@ public EncryptionContext getEncryptionContext()
{
return encryptionContext;
}
+
+ /**
+ * Returns Direct-IO/non-buffer used for CommitLog IO.
+ * @return Direct-IO used for CommitLog IO
+ */
+ public boolean isDirectIOEnabled()
+ {
+ return diskAccessMode == Config.DiskAccessMode.direct;
+ }
+
+ /**
+ * Returns Standard or buffered I/O used for CommitLog IO.
Review Comment:
I think we can delete this line of description as repeated with the line
below
##########
src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import com.sun.nio.file.ExtendedOpenOption;
+import net.openhft.chronicle.core.util.ThrowingFunction;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SimpleCachedBufferPool;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import sun.nio.ch.DirectBuffer;
+
+/*
+ * Direct-IO segment. Allocates ByteBuffer using ByteBuffer.allocateDirect and
align
+ * ByteBuffer.position, ByteBuffer.limit and FileChannel.position to page size
(4K).
+ * Java-11 forces minimum page size to be written to disk with Direct-IO.
+ */
+public class DirectIOSegment extends CommitLogSegment
+{
+ private final int fsBlockSize;
+ private final int fsBlockQuotientMask;
+ private final int fsBlockRemainderMask;
+
+ // Needed to track number of bytes written to disk in multiple of page
size.
+ long lastWritten = 0;
+
+ /**
+ * Constructs a new segment file.
+ */
+ DirectIOSegment(AbstractCommitLogSegmentManager manager,
ThrowingFunction<Path, FileChannel, IOException> channelFactory, int
fsBlockSize)
+ {
+ super(manager, channelFactory);
+
+ assert Integer.highestOneBit(fsBlockSize) == fsBlockSize :
"fsBlockSize must be a power of 2";
+
+ // mark the initial sync marker as uninitialised
+ int firstSync = buffer.position();
+ buffer.putInt(firstSync + 0, 0);
+ buffer.putInt(firstSync + 4, 0);
+
+ this.fsBlockSize = fsBlockSize;
+ this.fsBlockRemainderMask = fsBlockSize - 1;
+ this.fsBlockQuotientMask = ~fsBlockRemainderMask;
+ }
+
+ @Override
+ void writeLogHeader()
+ {
+ super.writeLogHeader();
+ // Testing shows writing initial bytes takes some time for Direct I/O.
During peak load,
+ // it is better to make "COMMIT-LOG-ALLOCATOR" thread to write these
few bytes of each
+ // file and this helps syncer thread to speedup the flush activity.
+ flush(0, lastSyncedOffset);
+ }
+
+ @Override
+ void write(int startMarker, int nextMarker)
+ {
+ // if there's room in the discard section to write an empty header,
+ // zero out the next sync marker so replayer can cleanly exit
+ if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE)
+ {
+ buffer.putInt(nextMarker, 0);
+ buffer.putInt(nextMarker + 4, 0);
+ }
+
+ // write previous sync marker to point to next sync marker
+ // we don't chain the crcs here to ensure this method is idempotent if
it fails
+ writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+ }
+
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+ try
+ {
+ // lastSyncedOffset is synced to disk. Align lastSyncedOffset to
start of its block
+ // and nextMarker to end of its block to avoid write errors.
+ int flushPosition = lastSyncedOffset;
+ ByteBuffer duplicate = buffer.duplicate();
+
+ // Aligned file position if not aligned to start of a block.
+ if ((flushPosition & fsBlockRemainderMask) != 0)
+ {
+ flushPosition = flushPosition & fsBlockQuotientMask;
+ channel.position(flushPosition);
+ }
+ duplicate.position(flushPosition);
+
+ int flushLimit = nextMarker;
+
+ // Align last byte to end of block.
+ if ((flushLimit & fsBlockRemainderMask) != 0)
+ flushLimit = (flushLimit + fsBlockSize) & fsBlockQuotientMask;
+
+ duplicate.limit(flushLimit);
+
+ channel.write(duplicate);
+
+ // Direct I/O always writes flushes in block size and writes more
than the flush size.
+ // File size on disk will always multiple of block size and taking
this into account
+ // helps testcases to pass. Avoid counting same block more than
once.
+ if (flushLimit > lastWritten)
+ {
+ manager.addSize(flushLimit - lastWritten);
+ lastWritten = flushLimit;
+ }
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+ }
+
+ @Override
+ public long onDiskSize()
+ {
+ return lastWritten;
+ }
+
+ @Override
+ protected void internalClose()
+ {
+ try
+ {
+ manager.getBufferPool().releaseBuffer(buffer);
+ super.internalClose();
+ }
+ finally
+ {
+ manager.notifyBufferFreed();
+ }
+ }
+
+ protected static class DirectIOSegmentBuilder extends
CommitLogSegment.Builder<DirectIOSegment>
+ {
+ public final int fsBlockSize;
+
+ public DirectIOSegmentBuilder(AbstractCommitLogSegmentManager
segmentManager)
+ {
+ super(segmentManager);
+ this.fsBlockSize = FileUtils.getBlockSize(new
File(segmentManager.storageDirectory));
+ }
+
+ @Override
+ public DirectIOSegment build()
+ {
+ return new DirectIOSegment(segmentManager,
+ path -> FileChannel.open(path,
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE,
ExtendedOpenOption.DIRECT),
+ fsBlockSize);
+ }
+
+ @Override
+ public SimpleCachedBufferPool createBufferPool()
+ {
+ // The direct buffer must be aligned with the file system block
size. We cannot enforce that during
+ // allocation, but we can get an aligned slice from the allocated
buffer. The buffer must be oversized by the
+ // alignment unit to make it possible.
+ return new
SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(),
+
DatabaseDescriptor.getCommitLogSegmentSize() + fsBlockSize,
+ BufferType.OFF_HEAP) {
+ @Override
+ public ByteBuffer createBuffer()
+ {
+ int segmentSize =
DatabaseDescriptor.getCommitLogSegmentSize();
+
+ ByteBuffer original = super.createBuffer();
+
+ // May get previously used buffer and zero it out to now.
Direct I/O writes additional bytes during
+ // flush operation
+ ByteBufferUtil.writeZeroes(original.duplicate(),
original.limit());
+
+ ByteBuffer alignedBuffer =
original.alignedSlice(fsBlockSize);
+ assert alignedBuffer.limit() >= segmentSize :
String.format("Bytebuffer slicing failed to get required buffer size
(required=%d,current size=%d", segmentSize, alignedBuffer.limit());
Review Comment:
nit: required=%d,current size=%d -> required=%d, current size=%d
##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -1415,6 +1426,52 @@ public static void applyPartitioner(Config conf)
paritionerName = partitioner.getClass().getCanonicalName();
}
+ private static DiskAccessMode
resolveCommitLogWriteDiskAccessMode(DiskAccessMode providedDiskAccessMode)
+ {
+ boolean compressOrEncrypt = getCommitLogCompression() != null ||
(getEncryptionContext() != null && getEncryptionContext().isEnabled());
+ boolean directIOSupported = false;
+ try
+ {
+ directIOSupported = FileUtils.getBlockSize(new
File(getCommitLogLocation())) > 0;
Review Comment:
My question here is :What are the advantages of using this method to
determine support for dirIO other than directly obtaining the jdk version for
determination?
##########
src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java:
##########
@@ -41,15 +48,10 @@ public class CompressedSegment extends FileDirectSegment
/**
* Constructs a new segment file.
*/
- CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager
manager)
+ CompressedSegment(AbstractCommitLogSegmentManager manager,
ThrowingFunction<Path, FileChannel, IOException> channelFactory)
{
- super(commitLog, manager);
- this.compressor = commitLog.configuration.getCompressor();
- }
-
- ByteBuffer createBuffer(CommitLog commitLog)
- {
- return manager.getBufferPool().createBuffer();
+ super(manager, channelFactory);
+ this.compressor = manager.commitLog.configuration.getCompressor();
Review Comment:
what about just add a method for AbstractCommitLogSegmentManager called
getCompressor() and return the commitlog.configuration.getCompressor();
I'm not very insistent on doing this. It’s okay for the current situation
##########
test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java:
##########
@@ -281,8 +292,10 @@ private void verifySizes(CommitLog commitLog)
commitLog.segmentManager.awaitManagementTasksCompletion();
long combinedSize = 0;
- for (File f : new
File(commitLog.segmentManager.storageDirectory).tryList())
+ for (File f : new
File(commitLog.segmentManager.storageDirectory).tryList()) {
Review Comment:
nit: ”{“ needs to change to a new line
##########
src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java:
##########
@@ -41,15 +48,10 @@ public class CompressedSegment extends FileDirectSegment
/**
* Constructs a new segment file.
*/
- CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager
manager)
+ CompressedSegment(AbstractCommitLogSegmentManager manager,
ThrowingFunction<Path, FileChannel, IOException> channelFactory)
{
- super(commitLog, manager);
- this.compressor = commitLog.configuration.getCompressor();
- }
-
- ByteBuffer createBuffer(CommitLog commitLog)
- {
- return manager.getBufferPool().createBuffer();
+ super(manager, channelFactory);
+ this.compressor = manager.commitLog.configuration.getCompressor();
Review Comment:
what about just add a method for AbstractCommitLogSegmentManager called
getCompressor() and return the commitlog.configuration.getCompressor();
I'm not very insistent on doing this. It’s okay for the current situation
##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -1415,6 +1426,52 @@ public static void applyPartitioner(Config conf)
paritionerName = partitioner.getClass().getCanonicalName();
}
+ private static DiskAccessMode
resolveCommitLogWriteDiskAccessMode(DiskAccessMode providedDiskAccessMode)
+ {
+ boolean compressOrEncrypt = getCommitLogCompression() != null ||
(getEncryptionContext() != null && getEncryptionContext().isEnabled());
+ boolean directIOSupported = false;
+ try
+ {
+ directIOSupported = FileUtils.getBlockSize(new
File(getCommitLogLocation())) > 0;
+ }
+ catch (RuntimeException e)
+ {
+ logger.info("Unable to determine block size for commit log
directory: {}", e.getMessage());
Review Comment:
what about change the log level to warn ?
##########
src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java:
##########
@@ -92,4 +94,27 @@ public long onDiskSize()
{
return lastWrittenPos;
}
+
+ protected static class CompressedSegmentBuilder extends
CommitLogSegment.Builder<CompressedSegment>
+ {
+ public CompressedSegmentBuilder(AbstractCommitLogSegmentManager
segmentManager)
+ {
+ super(segmentManager);
+ }
+
+ @Override
+ public CompressedSegment build()
+ {
+ return new CompressedSegment(segmentManager,
+ path -> FileChannel.open(path,
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE));
+ }
+
+ @Override
+ public SimpleCachedBufferPool createBufferPool()
+ {
+ return new
SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(),
+
DatabaseDescriptor.getCommitLogSegmentSize(),
+
segmentManager.commitLog.configuration.getCompressor().preferredBufferType());
Review Comment:
same with the upper comment for CompressedSegment construct function
##########
test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java:
##########
@@ -281,8 +292,10 @@ private void verifySizes(CommitLog commitLog)
commitLog.segmentManager.awaitManagementTasksCompletion();
long combinedSize = 0;
- for (File f : new
File(commitLog.segmentManager.storageDirectory).tryList())
+ for (File f : new
File(commitLog.segmentManager.storageDirectory).tryList()) {
Review Comment:
nit: ”{“ needs to change to a new line
##########
test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java:
##########
@@ -805,4 +810,107 @@ public void testInvalidSub1DefaultRFs() throws
IllegalArgumentException
{
DatabaseDescriptor.setDefaultKeyspaceRF(0);
}
+
+ @Test
+ public void testCommitLogDiskAccessMode() throws IOException
+ {
+ ParameterizedClass savedCompression =
DatabaseDescriptor.getCommitLogCompression();
+ EncryptionContext savedEncryptionContexg =
DatabaseDescriptor.getEncryptionContext();
+ Config.DiskAccessMode savedCommitLogDOS =
DatabaseDescriptor.getCommitLogWriteDiskAccessMode();
+ String savedCommitLogLocation =
DatabaseDescriptor.getCommitLogLocation();
+
+ try
+ {
+ // block size available
+
DatabaseDescriptor.setCommitLogLocation(Files.createTempDirectory("testCommitLogDiskAccessMode").toString());
+
+ // no encryption or compression
+ DatabaseDescriptor.setCommitLogCompression(null);
+ DatabaseDescriptor.setEncryptionContext(null);
+ DatabaseDescriptor.getRawConfig().disk_optimization_strategy =
Config.DiskOptimizationStrategy.spinning;
+ assertCommitLogDiskAccessModes(Config.DiskAccessMode.mmap,
Config.DiskAccessMode.mmap, Config.DiskAccessMode.mmap,
Config.DiskAccessMode.direct);
+ DatabaseDescriptor.getRawConfig().disk_optimization_strategy =
Config.DiskOptimizationStrategy.ssd;
+ assertCommitLogDiskAccessModes(Config.DiskAccessMode.mmap,
Config.DiskAccessMode.direct, Config.DiskAccessMode.mmap,
Config.DiskAccessMode.direct);
+
+ // compression enabled
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", null));
+ DatabaseDescriptor.setEncryptionContext(null);
+ DatabaseDescriptor.getRawConfig().disk_optimization_strategy =
Config.DiskOptimizationStrategy.spinning;
+ assertCommitLogDiskAccessModes(Config.DiskAccessMode.standard,
Config.DiskAccessMode.standard, Config.DiskAccessMode.standard);
+ DatabaseDescriptor.getRawConfig().disk_optimization_strategy =
Config.DiskOptimizationStrategy.ssd;
+ assertCommitLogDiskAccessModes(Config.DiskAccessMode.standard,
Config.DiskAccessMode.standard, Config.DiskAccessMode.standard);
+
+ // encryption enabled
+ DatabaseDescriptor.setCommitLogCompression(null);
+ DatabaseDescriptor.setEncryptionContext(new
EncryptionContext(EncryptionContextGenerator.createEncryptionOptions()));
+ DatabaseDescriptor.getRawConfig().disk_optimization_strategy =
Config.DiskOptimizationStrategy.spinning;
+ assertCommitLogDiskAccessModes(Config.DiskAccessMode.standard,
Config.DiskAccessMode.standard, Config.DiskAccessMode.standard);
+ DatabaseDescriptor.getRawConfig().disk_optimization_strategy =
Config.DiskOptimizationStrategy.ssd;
+ assertCommitLogDiskAccessModes(Config.DiskAccessMode.standard,
Config.DiskAccessMode.standard, Config.DiskAccessMode.standard);
+
+ // block size not available
+ DatabaseDescriptor.setCommitLogLocation(null);
+
+ // no encryption or compression
+ DatabaseDescriptor.setCommitLogCompression(null);
+ DatabaseDescriptor.setEncryptionContext(null);
+ DatabaseDescriptor.getRawConfig().disk_optimization_strategy =
Config.DiskOptimizationStrategy.spinning;
+ assertCommitLogDiskAccessModes(Config.DiskAccessMode.mmap,
Config.DiskAccessMode.mmap, Config.DiskAccessMode.mmap,
Config.DiskAccessMode.direct);
+ DatabaseDescriptor.getRawConfig().disk_optimization_strategy =
Config.DiskOptimizationStrategy.ssd;
+ assertCommitLogDiskAccessModes(Config.DiskAccessMode.mmap,
Config.DiskAccessMode.mmap, Config.DiskAccessMode.mmap,
Config.DiskAccessMode.direct);
+
+ // compression enabled
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", null));
+ DatabaseDescriptor.setEncryptionContext(null);
+ DatabaseDescriptor.getRawConfig().disk_optimization_strategy =
Config.DiskOptimizationStrategy.spinning;
+ assertCommitLogDiskAccessModes(Config.DiskAccessMode.standard,
Config.DiskAccessMode.standard, Config.DiskAccessMode.standard);
+ DatabaseDescriptor.getRawConfig().disk_optimization_strategy =
Config.DiskOptimizationStrategy.ssd;
+ assertCommitLogDiskAccessModes(Config.DiskAccessMode.standard,
Config.DiskAccessMode.standard, Config.DiskAccessMode.standard);
+
+ // encryption enabled
+ DatabaseDescriptor.setCommitLogCompression(null);
+ DatabaseDescriptor.setEncryptionContext(new
EncryptionContext(EncryptionContextGenerator.createEncryptionOptions()));
+ DatabaseDescriptor.getRawConfig().disk_optimization_strategy =
Config.DiskOptimizationStrategy.spinning;
+ assertCommitLogDiskAccessModes(Config.DiskAccessMode.standard,
Config.DiskAccessMode.standard, Config.DiskAccessMode.standard);
+ DatabaseDescriptor.getRawConfig().disk_optimization_strategy =
Config.DiskOptimizationStrategy.ssd;
+ assertCommitLogDiskAccessModes(Config.DiskAccessMode.standard,
Config.DiskAccessMode.standard, Config.DiskAccessMode.standard);
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitLogCompression(savedCompression);
+ DatabaseDescriptor.setEncryptionContext(savedEncryptionContexg);
+
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(savedCommitLogDOS);
+ DatabaseDescriptor.setCommitLogLocation(savedCommitLogLocation);
+ }
+ }
+
+ private void assertCommitLogDiskAccessModes(Config.DiskAccessMode
expectedLegacy, Config.DiskAccessMode expectedAuto, Config.DiskAccessMode...
allowedModesArray)
+ {
+ EnumSet<Config.DiskAccessMode> allowedModes =
EnumSet.copyOf(Arrays.asList(allowedModesArray));
+ allowedModes.add(Config.DiskAccessMode.legacy);
+ allowedModes.add(Config.DiskAccessMode.auto);
+
+ EnumSet<Config.DiskAccessMode> disallowedModes =
EnumSet.complementOf(allowedModes);
+
+ for (Config.DiskAccessMode mode : disallowedModes)
+ {
+ DatabaseDescriptor.setCommitLogWriteDiskAccessMode(mode);
+
assertThatExceptionOfType(ConfigurationException.class).isThrownBy(DatabaseDescriptor::initializeCommitLogDiskAccessMode);
+ }
+
+ for (Config.DiskAccessMode mode : allowedModes)
+ {
+ DatabaseDescriptor.setCommitLogWriteDiskAccessMode(mode);
+ // logger.info("commit log directory {}",
DatabaseDescriptor.getCommitLogLocation());
Review Comment:
what about delete it if not needed any more?
##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -1415,6 +1426,52 @@ public static void applyPartitioner(Config conf)
paritionerName = partitioner.getClass().getCanonicalName();
}
+ private static DiskAccessMode
resolveCommitLogWriteDiskAccessMode(DiskAccessMode providedDiskAccessMode)
+ {
+ boolean compressOrEncrypt = getCommitLogCompression() != null ||
(getEncryptionContext() != null && getEncryptionContext().isEnabled());
+ boolean directIOSupported = false;
+ try
+ {
+ directIOSupported = FileUtils.getBlockSize(new
File(getCommitLogLocation())) > 0;
Review Comment:
My question here is :What are the advantages of using this method to
determine support for dirIO other than directly obtaining the jdk version for
determination?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]