Github user PepperJo commented on a diff in the pull request:
https://github.com/apache/incubator-crail/pull/16#discussion_r180337495
--- Diff:
storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
---
@@ -19,208 +19,229 @@
package org.apache.crail.storage.nvmf.client;
-import com.ibm.disni.nvmef.NvmeCommand;
-import com.ibm.disni.nvmef.NvmeEndpoint;
-import com.ibm.disni.nvmef.NvmeEndpointGroup;
-import com.ibm.disni.nvmef.spdk.IOCompletion;
-
+import com.ibm.jnvmf.*;
import org.apache.crail.CrailBuffer;
+import org.apache.crail.CrailBufferCache;
+import org.apache.crail.CrailStatistics;
import org.apache.crail.conf.CrailConstants;
-import org.apache.crail.memory.BufferCache;
import org.apache.crail.metadata.BlockInfo;
+import org.apache.crail.metadata.DataNodeInfo;
import org.apache.crail.storage.StorageEndpoint;
import org.apache.crail.storage.StorageFuture;
-import org.apache.crail.storage.nvmf.NvmfBufferCache;
import org.apache.crail.storage.nvmf.NvmfStorageConstants;
import org.apache.crail.utils.CrailUtils;
import org.slf4j.Logger;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.*;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
public class NvmfStorageEndpoint implements StorageEndpoint {
private static final Logger LOG = CrailUtils.getLogger();
- private final InetSocketAddress inetSocketAddress;
- private final NvmeEndpoint endpoint;
- private final int sectorSize;
- private final BufferCache cache;
- private final BlockingQueue<NvmeCommand> freeCommands;
- private final NvmeCommand[] commands;
- private final NvmfStorageFuture[] futures;
- private final ThreadLocal<long[]> completed;
- private final int ioQeueueSize;
-
- public NvmfStorageEndpoint(NvmeEndpointGroup group, InetSocketAddress
inetSocketAddress) throws IOException {
- this.inetSocketAddress = inetSocketAddress;
- endpoint = group.createEndpoint();
+ private final Controller controller;
+ private final IoQueuePair queuePair;
+ private final int lbaDataSize;
+ private final long namespaceCapacity;
+ private final NvmfRegisteredBufferCache registeredBufferCache;
+ private final NvmfStagingBufferCache stagingBufferCache;
+ private final CrailStatistics statistics;
+
+ private final Queue<NvmWriteCommand> writeCommands;
+ private final Queue<NvmReadCommand> readCommands;
+
+ private final AtomicInteger outstandingOperations;
+
+ public NvmfStorageEndpoint(Nvme nvme, DataNodeInfo info,
CrailStatistics statistics,
+ CrailBufferCache
bufferCache) throws IOException {
+ InetSocketAddress inetSocketAddress = new InetSocketAddress(
+ InetAddress.getByAddress(info.getIpAddress()),
info.getPort());
+ // XXX FIXME: nsid from datanodeinfo
+ NvmfTransportId transportId = new
NvmfTransportId(inetSocketAddress,
+ new
NvmeQualifiedName(NvmfStorageConstants.NQN.toString() + info.getPort()));
+ LOG.info("Connecting to NVMf target at " +
transportId.toString());
+ controller = nvme.connect(transportId);
+ controller.getControllerConfiguration().setEnable(true);
+ controller.syncConfiguration();
try {
- URI url = new URI("nvmef://" +
inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() +
- "/0/" + NvmfStorageConstants.NAMESPACE
+ "?subsystem=nqn.2016-06.io.spdk:cnode1");
- LOG.info("Connecting to " + url.toString());
- endpoint.connect(url);
- } catch (URISyntaxException e) {
- //FIXME
- e.printStackTrace();
+ controller.waitUntilReady();
+ } catch (TimeoutException e) {
+ throw new IOException(e);
}
- sectorSize = endpoint.getSectorSize();
- cache = new NvmfBufferCache();
- ioQeueueSize = endpoint.getIOQueueSize();
- freeCommands = new
ArrayBlockingQueue<NvmeCommand>(ioQeueueSize);
- commands = new NvmeCommand[ioQeueueSize];
- for (int i = 0; i < ioQeueueSize; i++) {
- NvmeCommand command = endpoint.newCommand();
- command.setId(i);
- commands[i] = command;
- freeCommands.add(command);
+ IdentifyControllerData identifyControllerData =
controller.getIdentifyControllerData();
+ if (CrailConstants.SLICE_SIZE >
identifyControllerData.getMaximumDataTransferSize().toInt()) {
+ throw new
IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + " > max transfer size
(" +
+
identifyControllerData.getMaximumDataTransferSize() + ")");
}
- futures = new NvmfStorageFuture[ioQeueueSize];
- completed = new ThreadLocal<long[]>() {
- public long[] initialValue() {
- return new long[ioQeueueSize];
+ List<Namespace> namespaces = controller.getActiveNamespaces();
+ //TODO: poll nsid in datanodeinfo
+ NamespaceIdentifier namespaceIdentifier = new
NamespaceIdentifier(1);
+ Namespace namespace = null;
+ for (Namespace n : namespaces) {
+ if (n.getIdentifier().equals(namespaceIdentifier)) {
+ namespace = n;
+ break;
}
- };
+ }
+ if (namespace == null) {
+ throw new IllegalArgumentException("No namespace with
id " + namespaceIdentifier +
+ " at controller " +
transportId.toString());
+ }
+ IdentifyNamespaceData identifyNamespaceData =
namespace.getIdentifyNamespaceData();
+ lbaDataSize =
identifyNamespaceData.getFormattedLbaSize().getLbaDataSize().toInt();
+ if (CrailConstants.SLICE_SIZE % lbaDataSize != 0) {
+ throw new
IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY +
+ " is not a multiple of LBA data size ("
+ lbaDataSize + ")");
+ }
+ namespaceCapacity =
identifyNamespaceData.getNamespaceCapacity() * lbaDataSize;
+ this.queuePair =
controller.createIoQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0,
+ SubmissionQueueEntry.SIZE);
+
+ this.writeCommands = new
ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
+ this.readCommands = new
ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
+ for(int i = 0; i < NvmfStorageConstants.QUEUE_SIZE; i++) {
+ NvmWriteCommand writeCommand = new
NvmWriteCommand(queuePair);
+ writeCommand.setSendInline(true);
+
writeCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
+ writeCommands.add(writeCommand);
+ NvmReadCommand readCommand = new
NvmReadCommand(queuePair);
+ readCommand.setSendInline(true);
+
readCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
+ readCommands.add(readCommand);
+ }
+ this.registeredBufferCache = new
NvmfRegisteredBufferCache(queuePair);
+ this.outstandingOperations = new AtomicInteger(0);
+ this.stagingBufferCache = new
NvmfStagingBufferCache(bufferCache,
+ NvmfStorageConstants.STAGING_CACHE_SIZE,
getLBADataSize());
+ this.statistics = statistics;
+ }
+
+ public void keepAlive() throws IOException {
+ controller.keepAlive();
+ }
+
+ public int getLBADataSize() {
+ return lbaDataSize;
}
- public int getSectorSize() {
- return sectorSize;
+ public long getNamespaceCapacity() {
+ return namespaceCapacity;
}
enum Operation {
WRITE,
- READ;
+ READ
}
- public StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo
remoteMr, long remoteOffset)
- throws IOException, InterruptedException {
- int length = buffer.remaining();
- if (length > CrailConstants.BLOCK_SIZE){
- throw new IOException("write size too large " + length);
- }
- if (length <= 0){
- throw new IOException("write size too small, len " +
length);
- }
- if (buffer.position() < 0){
- throw new IOException("local offset too small " +
buffer.position());
- }
- if (remoteOffset < 0){
- throw new IOException("remote offset too small " +
remoteOffset);
- }
+ void putOperation() {
+ outstandingOperations.decrementAndGet();
+ }
- if (remoteMr.getAddr() + remoteOffset + length >
endpoint.getNamespaceSize()){
- long tmpAddr = remoteMr.getAddr() + remoteOffset +
length;
- throw new IOException("remote fileOffset + remoteOffset
+ len = " + tmpAddr + " - size = " +
- endpoint.getNamespaceSize());
+ private boolean tryGetOperation() {
+ int outstandingOperationsOld = outstandingOperations.get();
+ if (outstandingOperationsOld < NvmfStorageConstants.QUEUE_SIZE)
{
+ return
outstandingOperations.compareAndSet(outstandingOperationsOld,
outstandingOperationsOld + 1);
}
+ return false;
+ }
-// LOG.info("op = " + op.name() +
-// ", position = " + buffer.position() +
-// ", localOffset = " + buffer.position() +
-// ", remoteOffset = " + remoteOffset +
-// ", remoteAddr = " + remoteMr.getAddr() +
-// ", length = " + length);
-
- NvmeCommand command = freeCommands.poll();
- while (command == null) {
- poll();
- command = freeCommands.poll();
- }
+ private static int divCeil(int a, int b) {
+ return (a + b - 1) / b;
+ }
- boolean aligned =
NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0
- &&
NvmfStorageUtils.namespaceSectorOffset(sectorSize, length) == 0;
- long lba = NvmfStorageUtils.linearBlockAddress(remoteMr,
remoteOffset, sectorSize);
- StorageFuture future = null;
- if (aligned) {
-// LOG.info("aligned");
-
command.setBuffer(buffer.getByteBuffer()).setLinearBlockAddress(lba);
- switch(op) {
- case READ:
- command.read();
- break;
- case WRITE:
- command.write();
- break;
- }
- future = futures[(int)command.getId()] = new
NvmfStorageFuture(this, length);
- command.execute();
- } else {
-// LOG.info("unaligned");
- long alignedLength =
NvmfStorageUtils.alignLength(sectorSize, remoteOffset, length);
+ private int getNumLogicalBlocks(CrailBuffer buffer) {
+ return divCeil(buffer.remaining(), getLBADataSize());
+ }
- CrailBuffer stagingBuffer = cache.allocateBuffer();
- stagingBuffer.limit((int)alignedLength);
+ StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo blockInfo,
long remoteOffset) throws InterruptedException, IOException {
+ assert blockInfo.getAddr() + remoteOffset + buffer.remaining()
<= getNamespaceCapacity();
--- End diff --
Yes, this is meant for debugging purposes only.
---