Github user animeshtrivedi commented on a diff in the pull request:
https://github.com/apache/incubator-crail/pull/16#discussion_r180317997
--- Diff:
storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
---
@@ -19,208 +19,229 @@
package org.apache.crail.storage.nvmf.client;
-import com.ibm.disni.nvmef.NvmeCommand;
-import com.ibm.disni.nvmef.NvmeEndpoint;
-import com.ibm.disni.nvmef.NvmeEndpointGroup;
-import com.ibm.disni.nvmef.spdk.IOCompletion;
-
+import com.ibm.jnvmf.*;
import org.apache.crail.CrailBuffer;
+import org.apache.crail.CrailBufferCache;
+import org.apache.crail.CrailStatistics;
import org.apache.crail.conf.CrailConstants;
-import org.apache.crail.memory.BufferCache;
import org.apache.crail.metadata.BlockInfo;
+import org.apache.crail.metadata.DataNodeInfo;
import org.apache.crail.storage.StorageEndpoint;
import org.apache.crail.storage.StorageFuture;
-import org.apache.crail.storage.nvmf.NvmfBufferCache;
import org.apache.crail.storage.nvmf.NvmfStorageConstants;
import org.apache.crail.utils.CrailUtils;
import org.slf4j.Logger;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.*;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
public class NvmfStorageEndpoint implements StorageEndpoint {
private static final Logger LOG = CrailUtils.getLogger();
- private final InetSocketAddress inetSocketAddress;
- private final NvmeEndpoint endpoint;
- private final int sectorSize;
- private final BufferCache cache;
- private final BlockingQueue<NvmeCommand> freeCommands;
- private final NvmeCommand[] commands;
- private final NvmfStorageFuture[] futures;
- private final ThreadLocal<long[]> completed;
- private final int ioQeueueSize;
-
- public NvmfStorageEndpoint(NvmeEndpointGroup group, InetSocketAddress
inetSocketAddress) throws IOException {
- this.inetSocketAddress = inetSocketAddress;
- endpoint = group.createEndpoint();
+ private final Controller controller;
+ private final IoQueuePair queuePair;
+ private final int lbaDataSize;
+ private final long namespaceCapacity;
+ private final NvmfRegisteredBufferCache registeredBufferCache;
+ private final NvmfStagingBufferCache stagingBufferCache;
+ private final CrailStatistics statistics;
+
+ private final Queue<NvmWriteCommand> writeCommands;
+ private final Queue<NvmReadCommand> readCommands;
+
+ private final AtomicInteger outstandingOperations;
+
+ public NvmfStorageEndpoint(Nvme nvme, DataNodeInfo info,
CrailStatistics statistics,
+ CrailBufferCache
bufferCache) throws IOException {
+ InetSocketAddress inetSocketAddress = new InetSocketAddress(
+ InetAddress.getByAddress(info.getIpAddress()),
info.getPort());
+ // XXX FIXME: nsid from datanodeinfo
+ NvmfTransportId transportId = new
NvmfTransportId(inetSocketAddress,
+ new
NvmeQualifiedName(NvmfStorageConstants.NQN.toString() + info.getPort()));
+ LOG.info("Connecting to NVMf target at " +
transportId.toString());
+ controller = nvme.connect(transportId);
+ controller.getControllerConfiguration().setEnable(true);
+ controller.syncConfiguration();
try {
- URI url = new URI("nvmef://" +
inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() +
- "/0/" + NvmfStorageConstants.NAMESPACE
+ "?subsystem=nqn.2016-06.io.spdk:cnode1");
- LOG.info("Connecting to " + url.toString());
- endpoint.connect(url);
- } catch (URISyntaxException e) {
- //FIXME
- e.printStackTrace();
+ controller.waitUntilReady();
+ } catch (TimeoutException e) {
+ throw new IOException(e);
}
- sectorSize = endpoint.getSectorSize();
- cache = new NvmfBufferCache();
- ioQeueueSize = endpoint.getIOQueueSize();
- freeCommands = new
ArrayBlockingQueue<NvmeCommand>(ioQeueueSize);
- commands = new NvmeCommand[ioQeueueSize];
- for (int i = 0; i < ioQeueueSize; i++) {
- NvmeCommand command = endpoint.newCommand();
- command.setId(i);
- commands[i] = command;
- freeCommands.add(command);
+ IdentifyControllerData identifyControllerData =
controller.getIdentifyControllerData();
+ if (CrailConstants.SLICE_SIZE >
identifyControllerData.getMaximumDataTransferSize().toInt()) {
+ throw new
IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + " > max transfer size
(" +
+
identifyControllerData.getMaximumDataTransferSize() + ")");
}
- futures = new NvmfStorageFuture[ioQeueueSize];
- completed = new ThreadLocal<long[]>() {
- public long[] initialValue() {
- return new long[ioQeueueSize];
+ List<Namespace> namespaces = controller.getActiveNamespaces();
+ //TODO: poll nsid in datanodeinfo
+ NamespaceIdentifier namespaceIdentifier = new
NamespaceIdentifier(1);
+ Namespace namespace = null;
+ for (Namespace n : namespaces) {
+ if (n.getIdentifier().equals(namespaceIdentifier)) {
+ namespace = n;
+ break;
}
- };
+ }
+ if (namespace == null) {
+ throw new IllegalArgumentException("No namespace with
id " + namespaceIdentifier +
+ " at controller " +
transportId.toString());
+ }
+ IdentifyNamespaceData identifyNamespaceData =
namespace.getIdentifyNamespaceData();
+ lbaDataSize =
identifyNamespaceData.getFormattedLbaSize().getLbaDataSize().toInt();
+ if (CrailConstants.SLICE_SIZE % lbaDataSize != 0) {
+ throw new
IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY +
+ " is not a multiple of LBA data size ("
+ lbaDataSize + ")");
+ }
+ namespaceCapacity =
identifyNamespaceData.getNamespaceCapacity() * lbaDataSize;
+ this.queuePair =
controller.createIoQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0,
+ SubmissionQueueEntry.SIZE);
+
+ this.writeCommands = new
ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
+ this.readCommands = new
ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
+ for(int i = 0; i < NvmfStorageConstants.QUEUE_SIZE; i++) {
+ NvmWriteCommand writeCommand = new
NvmWriteCommand(queuePair);
+ writeCommand.setSendInline(true);
+
writeCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
+ writeCommands.add(writeCommand);
+ NvmReadCommand readCommand = new
NvmReadCommand(queuePair);
+ readCommand.setSendInline(true);
+
readCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
+ readCommands.add(readCommand);
+ }
+ this.registeredBufferCache = new
NvmfRegisteredBufferCache(queuePair);
+ this.outstandingOperations = new AtomicInteger(0);
+ this.stagingBufferCache = new
NvmfStagingBufferCache(bufferCache,
+ NvmfStorageConstants.STAGING_CACHE_SIZE,
getLBADataSize());
+ this.statistics = statistics;
+ }
+
+ public void keepAlive() throws IOException {
+ controller.keepAlive();
+ }
+
+ public int getLBADataSize() {
+ return lbaDataSize;
}
- public int getSectorSize() {
- return sectorSize;
+ public long getNamespaceCapacity() {
+ return namespaceCapacity;
}
enum Operation {
WRITE,
- READ;
+ READ
}
- public StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo
remoteMr, long remoteOffset)
- throws IOException, InterruptedException {
- int length = buffer.remaining();
- if (length > CrailConstants.BLOCK_SIZE){
- throw new IOException("write size too large " + length);
- }
- if (length <= 0){
- throw new IOException("write size too small, len " +
length);
- }
- if (buffer.position() < 0){
- throw new IOException("local offset too small " +
buffer.position());
- }
- if (remoteOffset < 0){
- throw new IOException("remote offset too small " +
remoteOffset);
- }
+ void putOperation() {
+ outstandingOperations.decrementAndGet();
+ }
- if (remoteMr.getAddr() + remoteOffset + length >
endpoint.getNamespaceSize()){
- long tmpAddr = remoteMr.getAddr() + remoteOffset +
length;
- throw new IOException("remote fileOffset + remoteOffset
+ len = " + tmpAddr + " - size = " +
- endpoint.getNamespaceSize());
+ private boolean tryGetOperation() {
+ int outstandingOperationsOld = outstandingOperations.get();
+ if (outstandingOperationsOld < NvmfStorageConstants.QUEUE_SIZE)
{
+ return
outstandingOperations.compareAndSet(outstandingOperationsOld,
outstandingOperationsOld + 1);
}
+ return false;
+ }
-// LOG.info("op = " + op.name() +
-// ", position = " + buffer.position() +
-// ", localOffset = " + buffer.position() +
-// ", remoteOffset = " + remoteOffset +
-// ", remoteAddr = " + remoteMr.getAddr() +
-// ", length = " + length);
-
- NvmeCommand command = freeCommands.poll();
- while (command == null) {
- poll();
- command = freeCommands.poll();
- }
+ private static int divCeil(int a, int b) {
+ return (a + b - 1) / b;
+ }
- boolean aligned =
NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0
- &&
NvmfStorageUtils.namespaceSectorOffset(sectorSize, length) == 0;
- long lba = NvmfStorageUtils.linearBlockAddress(remoteMr,
remoteOffset, sectorSize);
- StorageFuture future = null;
- if (aligned) {
-// LOG.info("aligned");
-
command.setBuffer(buffer.getByteBuffer()).setLinearBlockAddress(lba);
- switch(op) {
- case READ:
- command.read();
- break;
- case WRITE:
- command.write();
- break;
- }
- future = futures[(int)command.getId()] = new
NvmfStorageFuture(this, length);
- command.execute();
- } else {
-// LOG.info("unaligned");
- long alignedLength =
NvmfStorageUtils.alignLength(sectorSize, remoteOffset, length);
+ private int getNumLogicalBlocks(CrailBuffer buffer) {
+ return divCeil(buffer.remaining(), getLBADataSize());
+ }
- CrailBuffer stagingBuffer = cache.allocateBuffer();
- stagingBuffer.limit((int)alignedLength);
+ StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo blockInfo,
long remoteOffset) throws InterruptedException, IOException {
+ assert blockInfo.getAddr() + remoteOffset + buffer.remaining()
<= getNamespaceCapacity();
+ assert remoteOffset >= 0;
+ assert buffer.remaining() <= CrailConstants.BLOCK_SIZE;
+
+ long startingAddress = blockInfo.getAddr() + remoteOffset;
+ if (startingAddress % getLBADataSize() != 0 ||
+ ((startingAddress + buffer.remaining()) %
getLBADataSize() != 0 && op == Operation.WRITE)) {
+ if (op == Operation.READ) {
+ throw new IOException("Unaligned read access is
not supported. Address (" + startingAddress +
+ ") needs to be multiple of LBA
data size " + getLBADataSize());
+ }
try {
- switch(op) {
- case READ: {
- NvmfStorageFuture f =
futures[(int)command.getId()] = new NvmfStorageFuture(this, (int)alignedLength);
-
command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute();
- future = new
NvmfStorageUnalignedReadFuture(f, this, buffer, remoteMr, remoteOffset,
stagingBuffer);
- break;
- }
- case WRITE: {
- if
(NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0) {
- // Do not read if the
offset is aligned to sector size
- int sizeToWrite =
length;
-
stagingBuffer.put(buffer.getByteBuffer());
-
stagingBuffer.position(0);
-
command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).write().execute();
- future =
futures[(int)command.getId()] = new NvmfStorageUnalignedWriteFuture(this,
sizeToWrite, stagingBuffer);
- } else {
- // RMW but append only
file system allows only reading last sector
- // and dir entries are
sector aligned
-
stagingBuffer.limit(sectorSize);
- NvmfStorageFuture f =
futures[(int)command.getId()] = new NvmfStorageFuture(this, sectorSize);
-
command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute();
- future = new
NvmfStorageUnalignedRMWFuture(f, this, buffer, remoteMr, remoteOffset,
stagingBuffer);
- }
- break;
- }
- }
- } catch (NoSuchFieldException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
+ return new NvmfUnalignedWriteFuture(this,
buffer, blockInfo, remoteOffset);
+ } catch (Exception e) {
throw new IOException(e);
}
}
+ if (!tryGetOperation()) {
+ do {
+ poll();
+ } while (!tryGetOperation());
+ }
+
+ NvmIoCommand<? extends NvmIoCommandCapsule> command;
+ NvmfFuture<?> future;
+ Response<NvmResponseCapsule> response;
+ if (op == Operation.READ) {
+ NvmReadCommand readCommand = readCommands.remove();
+ response = readCommand.newResponse();
+ future = new NvmfFuture<>(this, readCommand, response,
readCommands, buffer.remaining());
+ command = readCommand;
+ } else {
+ NvmWriteCommand writeCommand = writeCommands.remove();
+ response = writeCommand.newResponse();
+ future = new NvmfFuture<>(this, writeCommand, response,
writeCommands, buffer.remaining());
+ command = writeCommand;
+ }
+ command.setCallback(future);
+ response.setCallback(future);
+
+ NvmIoCommandSqe sqe =
command.getCommandCapsule().getSubmissionQueueEntry();
+ long startingLBA = startingAddress / getLBADataSize();
+ sqe.setStartingLba(startingLBA);
+ /* TODO: on read this potentially overwrites data beyond the
set limit */
+ short numLogicalBlocks = (short)getNumLogicalBlocks(buffer);
--- End diff --
Any particular reason this is short? than an integer?
---