Github user animeshtrivedi commented on a diff in the pull request: https://github.com/apache/incubator-crail/pull/16#discussion_r180317997 --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java --- @@ -19,208 +19,229 @@ package org.apache.crail.storage.nvmf.client; -import com.ibm.disni.nvmef.NvmeCommand; -import com.ibm.disni.nvmef.NvmeEndpoint; -import com.ibm.disni.nvmef.NvmeEndpointGroup; -import com.ibm.disni.nvmef.spdk.IOCompletion; - +import com.ibm.jnvmf.*; import org.apache.crail.CrailBuffer; +import org.apache.crail.CrailBufferCache; +import org.apache.crail.CrailStatistics; import org.apache.crail.conf.CrailConstants; -import org.apache.crail.memory.BufferCache; import org.apache.crail.metadata.BlockInfo; +import org.apache.crail.metadata.DataNodeInfo; import org.apache.crail.storage.StorageEndpoint; import org.apache.crail.storage.StorageFuture; -import org.apache.crail.storage.nvmf.NvmfBufferCache; import org.apache.crail.storage.nvmf.NvmfStorageConstants; import org.apache.crail.utils.CrailUtils; import org.slf4j.Logger; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.concurrent.*; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; public class NvmfStorageEndpoint implements StorageEndpoint { private static final Logger LOG = CrailUtils.getLogger(); - private final InetSocketAddress inetSocketAddress; - private final NvmeEndpoint endpoint; - private final int sectorSize; - private final BufferCache cache; - private final BlockingQueue<NvmeCommand> freeCommands; - private final NvmeCommand[] commands; - private final NvmfStorageFuture[] futures; - private final ThreadLocal<long[]> completed; - private final int ioQeueueSize; - - public NvmfStorageEndpoint(NvmeEndpointGroup group, InetSocketAddress inetSocketAddress) throws IOException { - this.inetSocketAddress = inetSocketAddress; - endpoint = group.createEndpoint(); + private final Controller controller; + private final IoQueuePair queuePair; + private final int lbaDataSize; + private final long namespaceCapacity; + private final NvmfRegisteredBufferCache registeredBufferCache; + private final NvmfStagingBufferCache stagingBufferCache; + private final CrailStatistics statistics; + + private final Queue<NvmWriteCommand> writeCommands; + private final Queue<NvmReadCommand> readCommands; + + private final AtomicInteger outstandingOperations; + + public NvmfStorageEndpoint(Nvme nvme, DataNodeInfo info, CrailStatistics statistics, + CrailBufferCache bufferCache) throws IOException { + InetSocketAddress inetSocketAddress = new InetSocketAddress( + InetAddress.getByAddress(info.getIpAddress()), info.getPort()); + // XXX FIXME: nsid from datanodeinfo + NvmfTransportId transportId = new NvmfTransportId(inetSocketAddress, + new NvmeQualifiedName(NvmfStorageConstants.NQN.toString() + info.getPort())); + LOG.info("Connecting to NVMf target at " + transportId.toString()); + controller = nvme.connect(transportId); + controller.getControllerConfiguration().setEnable(true); + controller.syncConfiguration(); try { - URI url = new URI("nvmef://" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + - "/0/" + NvmfStorageConstants.NAMESPACE + "?subsystem=nqn.2016-06.io.spdk:cnode1"); - LOG.info("Connecting to " + url.toString()); - endpoint.connect(url); - } catch (URISyntaxException e) { - //FIXME - e.printStackTrace(); + controller.waitUntilReady(); + } catch (TimeoutException e) { + throw new IOException(e); } - sectorSize = endpoint.getSectorSize(); - cache = new NvmfBufferCache(); - ioQeueueSize = endpoint.getIOQueueSize(); - freeCommands = new ArrayBlockingQueue<NvmeCommand>(ioQeueueSize); - commands = new NvmeCommand[ioQeueueSize]; - for (int i = 0; i < ioQeueueSize; i++) { - NvmeCommand command = endpoint.newCommand(); - command.setId(i); - commands[i] = command; - freeCommands.add(command); + IdentifyControllerData identifyControllerData = controller.getIdentifyControllerData(); + if (CrailConstants.SLICE_SIZE > identifyControllerData.getMaximumDataTransferSize().toInt()) { + throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + " > max transfer size (" + + identifyControllerData.getMaximumDataTransferSize() + ")"); } - futures = new NvmfStorageFuture[ioQeueueSize]; - completed = new ThreadLocal<long[]>() { - public long[] initialValue() { - return new long[ioQeueueSize]; + List<Namespace> namespaces = controller.getActiveNamespaces(); + //TODO: poll nsid in datanodeinfo + NamespaceIdentifier namespaceIdentifier = new NamespaceIdentifier(1); + Namespace namespace = null; + for (Namespace n : namespaces) { + if (n.getIdentifier().equals(namespaceIdentifier)) { + namespace = n; + break; } - }; + } + if (namespace == null) { + throw new IllegalArgumentException("No namespace with id " + namespaceIdentifier + + " at controller " + transportId.toString()); + } + IdentifyNamespaceData identifyNamespaceData = namespace.getIdentifyNamespaceData(); + lbaDataSize = identifyNamespaceData.getFormattedLbaSize().getLbaDataSize().toInt(); + if (CrailConstants.SLICE_SIZE % lbaDataSize != 0) { + throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + + " is not a multiple of LBA data size (" + lbaDataSize + ")"); + } + namespaceCapacity = identifyNamespaceData.getNamespaceCapacity() * lbaDataSize; + this.queuePair = controller.createIoQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0, + SubmissionQueueEntry.SIZE); + + this.writeCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE); + this.readCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE); + for(int i = 0; i < NvmfStorageConstants.QUEUE_SIZE; i++) { + NvmWriteCommand writeCommand = new NvmWriteCommand(queuePair); + writeCommand.setSendInline(true); + writeCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier); + writeCommands.add(writeCommand); + NvmReadCommand readCommand = new NvmReadCommand(queuePair); + readCommand.setSendInline(true); + readCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier); + readCommands.add(readCommand); + } + this.registeredBufferCache = new NvmfRegisteredBufferCache(queuePair); + this.outstandingOperations = new AtomicInteger(0); + this.stagingBufferCache = new NvmfStagingBufferCache(bufferCache, + NvmfStorageConstants.STAGING_CACHE_SIZE, getLBADataSize()); + this.statistics = statistics; + } + + public void keepAlive() throws IOException { + controller.keepAlive(); + } + + public int getLBADataSize() { + return lbaDataSize; } - public int getSectorSize() { - return sectorSize; + public long getNamespaceCapacity() { + return namespaceCapacity; } enum Operation { WRITE, - READ; + READ } - public StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset) - throws IOException, InterruptedException { - int length = buffer.remaining(); - if (length > CrailConstants.BLOCK_SIZE){ - throw new IOException("write size too large " + length); - } - if (length <= 0){ - throw new IOException("write size too small, len " + length); - } - if (buffer.position() < 0){ - throw new IOException("local offset too small " + buffer.position()); - } - if (remoteOffset < 0){ - throw new IOException("remote offset too small " + remoteOffset); - } + void putOperation() { + outstandingOperations.decrementAndGet(); + } - if (remoteMr.getAddr() + remoteOffset + length > endpoint.getNamespaceSize()){ - long tmpAddr = remoteMr.getAddr() + remoteOffset + length; - throw new IOException("remote fileOffset + remoteOffset + len = " + tmpAddr + " - size = " + - endpoint.getNamespaceSize()); + private boolean tryGetOperation() { + int outstandingOperationsOld = outstandingOperations.get(); + if (outstandingOperationsOld < NvmfStorageConstants.QUEUE_SIZE) { + return outstandingOperations.compareAndSet(outstandingOperationsOld, outstandingOperationsOld + 1); } + return false; + } -// LOG.info("op = " + op.name() + -// ", position = " + buffer.position() + -// ", localOffset = " + buffer.position() + -// ", remoteOffset = " + remoteOffset + -// ", remoteAddr = " + remoteMr.getAddr() + -// ", length = " + length); - - NvmeCommand command = freeCommands.poll(); - while (command == null) { - poll(); - command = freeCommands.poll(); - } + private static int divCeil(int a, int b) { + return (a + b - 1) / b; + } - boolean aligned = NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0 - && NvmfStorageUtils.namespaceSectorOffset(sectorSize, length) == 0; - long lba = NvmfStorageUtils.linearBlockAddress(remoteMr, remoteOffset, sectorSize); - StorageFuture future = null; - if (aligned) { -// LOG.info("aligned"); - command.setBuffer(buffer.getByteBuffer()).setLinearBlockAddress(lba); - switch(op) { - case READ: - command.read(); - break; - case WRITE: - command.write(); - break; - } - future = futures[(int)command.getId()] = new NvmfStorageFuture(this, length); - command.execute(); - } else { -// LOG.info("unaligned"); - long alignedLength = NvmfStorageUtils.alignLength(sectorSize, remoteOffset, length); + private int getNumLogicalBlocks(CrailBuffer buffer) { + return divCeil(buffer.remaining(), getLBADataSize()); + } - CrailBuffer stagingBuffer = cache.allocateBuffer(); - stagingBuffer.limit((int)alignedLength); + StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws InterruptedException, IOException { + assert blockInfo.getAddr() + remoteOffset + buffer.remaining() <= getNamespaceCapacity(); + assert remoteOffset >= 0; + assert buffer.remaining() <= CrailConstants.BLOCK_SIZE; + + long startingAddress = blockInfo.getAddr() + remoteOffset; + if (startingAddress % getLBADataSize() != 0 || + ((startingAddress + buffer.remaining()) % getLBADataSize() != 0 && op == Operation.WRITE)) { + if (op == Operation.READ) { + throw new IOException("Unaligned read access is not supported. Address (" + startingAddress + + ") needs to be multiple of LBA data size " + getLBADataSize()); + } try { - switch(op) { - case READ: { - NvmfStorageFuture f = futures[(int)command.getId()] = new NvmfStorageFuture(this, (int)alignedLength); - command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute(); - future = new NvmfStorageUnalignedReadFuture(f, this, buffer, remoteMr, remoteOffset, stagingBuffer); - break; - } - case WRITE: { - if (NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0) { - // Do not read if the offset is aligned to sector size - int sizeToWrite = length; - stagingBuffer.put(buffer.getByteBuffer()); - stagingBuffer.position(0); - command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).write().execute(); - future = futures[(int)command.getId()] = new NvmfStorageUnalignedWriteFuture(this, sizeToWrite, stagingBuffer); - } else { - // RMW but append only file system allows only reading last sector - // and dir entries are sector aligned - stagingBuffer.limit(sectorSize); - NvmfStorageFuture f = futures[(int)command.getId()] = new NvmfStorageFuture(this, sectorSize); - command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute(); - future = new NvmfStorageUnalignedRMWFuture(f, this, buffer, remoteMr, remoteOffset, stagingBuffer); - } - break; - } - } - } catch (NoSuchFieldException e) { - throw new IOException(e); - } catch (IllegalAccessException e) { + return new NvmfUnalignedWriteFuture(this, buffer, blockInfo, remoteOffset); + } catch (Exception e) { throw new IOException(e); } } + if (!tryGetOperation()) { + do { + poll(); + } while (!tryGetOperation()); + } + + NvmIoCommand<? extends NvmIoCommandCapsule> command; + NvmfFuture<?> future; + Response<NvmResponseCapsule> response; + if (op == Operation.READ) { + NvmReadCommand readCommand = readCommands.remove(); + response = readCommand.newResponse(); + future = new NvmfFuture<>(this, readCommand, response, readCommands, buffer.remaining()); + command = readCommand; + } else { + NvmWriteCommand writeCommand = writeCommands.remove(); + response = writeCommand.newResponse(); + future = new NvmfFuture<>(this, writeCommand, response, writeCommands, buffer.remaining()); + command = writeCommand; + } + command.setCallback(future); + response.setCallback(future); + + NvmIoCommandSqe sqe = command.getCommandCapsule().getSubmissionQueueEntry(); + long startingLBA = startingAddress / getLBADataSize(); + sqe.setStartingLba(startingLBA); + /* TODO: on read this potentially overwrites data beyond the set limit */ + short numLogicalBlocks = (short)getNumLogicalBlocks(buffer); --- End diff -- Any particular reason this is short? than an integer?
---