Ian Maxon has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/3133

Change subject: [WIP][NO ISSUE] Make IOManager more configurable
......................................................................

[WIP][NO ISSUE] Make IOManager more configurable

Change-Id: I1c8ad11c2b8b983ef4bf7cf78c2f068accddfff4
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
M 
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
M hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
M 
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
M 
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
M 
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
M 
hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
M 
hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java
11 files changed, 46 insertions(+), 32 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/33/3133/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index d89004b..aced54b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -87,6 +87,7 @@
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.network.INetworkSecurityManager;
+import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -177,6 +178,7 @@
     @Override
     public void initialize(IRecoveryManagerFactory recoveryManagerFactory, 
boolean initialRun) throws IOException {
         ioManager = getServiceContext().getIoManager();
+        int ioQueueLen = 
getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
         threadExecutor =
                 
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
@@ -230,11 +232,11 @@
             replicationChannel = new ReplicationChannel(this);
 
             bufferCache = new BufferCache(ioManager, prs, pcp, new 
FileMapManager(),
-                    storageProperties.getBufferCacheMaxOpenFiles(), 
getServiceContext().getThreadFactory(),
+                    storageProperties.getBufferCacheMaxOpenFiles(), 
ioQueueLen, getServiceContext().getThreadFactory(),
                     replicationManager);
         } else {
             bufferCache = new BufferCache(ioManager, prs, pcp, new 
FileMapManager(),
-                    storageProperties.getBufferCacheMaxOpenFiles(), 
getServiceContext().getThreadFactory());
+                    storageProperties.getBufferCacheMaxOpenFiles(), 
ioQueueLen, getServiceContext().getThreadFactory());
         }
 
         /*
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index d41350f..19d69f2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -89,7 +89,9 @@
         TRACE_CATEGORIES(STRING_ARRAY, new String[0]),
         KEY_STORE_PATH(STRING, (String) null),
         TRUST_STORE_PATH(STRING, (String) null),
-        KEY_STORE_PASSWORD(STRING, (String) null);
+        KEY_STORE_PASSWORD(STRING, (String) null),
+        IO_WORKERS_PER_PARTITION(INTEGER, 2),
+        IO_QUEUE_SIZE(INTEGER, 10);
 
         private final IOptionType parser;
         private final String defaultValueDescription;
@@ -218,8 +220,12 @@
                     return "A fully-qualified path to a trust store file that 
will be used for secured connections";
                 case KEY_STORE_PASSWORD:
                     return "The password to the provided key store";
+                case IO_WORKERS_PER_PARTITION:
+                    return "Number of threads per partition used to write and 
read from storage";
+                case IO_QUEUE_SIZE:
+                    return "Length of the queue used for requests to write and 
read";
                 default:
-                    throw new IllegalStateException("NYI: " + this);
+                    throw new IllegalStateException("Not yet implemented: " + 
this);
             }
         }
 
@@ -568,4 +574,12 @@
     public void setTrustStorePath(String keyStorePath) {
         configManager.set(CCConfig.Option.TRUST_STORE_PATH, keyStorePath);
     }
+
+    public int getIOParallelism() {
+        return appConfig.getInt(Option.IO_WORKERS_PER_PARTITION);
+    }
+
+    public int getIOQueueLen() {
+        return appConfig.getInt(Option.IO_QUEUE_SIZE);
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 317d59a..9fae522 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -214,8 +214,8 @@
         ncShutdownHook = new NCShutdownHook(this);
         Runtime.getRuntime().addShutdownHook(ncShutdownHook);
         
Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
-        ioManager =
-                new 
IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), 
application.getFileDeviceResolver());
+        ioManager = new 
IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()),
+                application.getFileDeviceResolver(), 
ncConfig.getIOParallelism(), ncConfig.getIOQueueLen());
         try {
             workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves 
MAX_PRIORITY of the heartbeat thread.
             jobletMap = new ConcurrentHashMap<>();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index b5cb21a..149d925 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -56,7 +56,6 @@
     /*
      * Constants
      */
-    public static final int IO_REQUEST_QUEUE_SIZE = 100; // TODO: Make 
configurable
     private static final Logger LOGGER = LogManager.getLogger();
     private static final String WORKSPACE_FILE_SUFFIX = ".waf";
     private static final FilenameFilter WORKSPACE_FILES_FILTER = (dir, name) 
-> name.endsWith(WORKSPACE_FILE_SUFFIX);
@@ -68,14 +67,19 @@
     private final BlockingQueue<IoRequest> freeRequests;
     private final List<IODeviceHandle> ioDevices;
     private final List<IODeviceHandle> workspaces;
+    private final int ioParallelism;
+    private final int queueLen;
     /*
      * Mutables
      */
     private int workspaceIndex;
     private final IFileDeviceResolver deviceComputer;
 
-    public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver 
deviceComputer) throws HyracksDataException {
+    public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver 
deviceComputer, int ioParallelism, int queueLen)
+            throws HyracksDataException {
         this.ioDevices = Collections.unmodifiableList(devices);
+        this.ioParallelism = ioParallelism;
+        this.queueLen = queueLen;
         checkDeviceValidity(devices);
         workspaces = new ArrayList<>();
         for (IODeviceHandle d : ioDevices) {
@@ -93,9 +97,9 @@
         }
         workspaceIndex = 0;
         this.deviceComputer = deviceComputer;
-        submittedRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
-        freeRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
-        int numIoThreads = ioDevices.size() * 2;
+        submittedRequests = new ArrayBlockingQueue<>(queueLen);
+        freeRequests = new ArrayBlockingQueue<>(queueLen);
+        int numIoThreads = ioDevices.size() * ioParallelism;
         executor = Executors.newFixedThreadPool(numIoThreads);
         for (int i = 0; i < numIoThreads; i++) {
             executor.execute(new IoRequestHandler(i, submittedRequests));
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index 32f8418..4005db1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -60,7 +60,7 @@
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
         IPageReplacementStrategy prs = new 
ClockPageReplacementStrategy(allocator, 32768, 50);
         bufferCache = new BufferCache(appCtx.getIoManager(), prs, new 
DelayPageCleanerPolicy(1000), fileMapManager, 100,
-                threadFactory);
+                10, threadFactory);
         ILocalResourceRepositoryFactory localResourceRepositoryFactory = new 
TransientLocalResourceRepositoryFactory();
         localResourceRepository = 
localResourceRepositoryFactory.createRepository();
         resourceIdFactory = (new 
ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
index 423925b..31037ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
@@ -50,11 +50,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-control-nc</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-util</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 7441395..451e38c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.storage.common.buffercache;
 
-import static org.apache.hyracks.control.nc.io.IOManager.IO_REQUEST_QUEUE_SIZE;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -76,8 +74,7 @@
     private final CleanerThread cleanerThread;
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
     private final AsyncFIFOPageQueueManager fifoWriter;
-    private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache =
-            new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
+    private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache;
 
     private IIOReplicationManager ioReplicationManager;
     private final List<ICachedPageInternal> cachedPages = new ArrayList<>();
@@ -93,8 +90,9 @@
     private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner;
 
     public BufferCache(IIOManager ioManager, IPageReplacementStrategy 
pageReplacementStrategy,
-            IPageCleanerPolicy pageCleanerPolicy, IFileMapManager 
fileMapManager, int maxOpenFiles,
+            IPageCleanerPolicy pageCleanerPolicy, IFileMapManager 
fileMapManager, int maxOpenFiles, int ioQueuelen,
             ThreadFactory threadFactory) {
+        this.headerPageCache = new ArrayBlockingQueue<>(ioQueuelen);
         this.ioManager = ioManager;
         this.pageSize = pageReplacementStrategy.getPageSize();
         this.maxOpenFiles = maxOpenFiles;
@@ -124,10 +122,11 @@
 
     //this constructor is used when replication is enabled to pass the 
IIOReplicationManager
     public BufferCache(IIOManager ioManager, IPageReplacementStrategy 
pageReplacementStrategy,
-            IPageCleanerPolicy pageCleanerPolicy, IFileMapManager 
fileMapManager, int maxOpenFiles,
+            IPageCleanerPolicy pageCleanerPolicy, IFileMapManager 
fileMapManager, int maxOpenFiles, int ioQueueLen,
             ThreadFactory threadFactory, IIOReplicationManager 
ioReplicationManager) {
 
-        this(ioManager, pageReplacementStrategy, pageCleanerPolicy, 
fileMapManager, maxOpenFiles, threadFactory);
+        this(ioManager, pageReplacementStrategy, pageCleanerPolicy, 
fileMapManager, maxOpenFiles, ioQueueLen,
+                threadFactory);
         this.ioReplicationManager = ioReplicationManager;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 7b7e850..a2da285 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -107,7 +107,7 @@
             List<IODeviceHandle> devices = new ArrayList<>();
             devices.add(new IODeviceHandle(new 
File(System.getProperty("user.dir") + File.separator + "target"),
                     "iodev_test_wa"));
-            ioManager = new IOManager(devices, new DefaultDeviceResolver());
+            ioManager = new IOManager(devices, new DefaultDeviceResolver(), 2, 
10);
         }
         return ioManager;
     }
@@ -152,7 +152,7 @@
         IPageReplacementStrategy prs = new 
ClockPageReplacementStrategy(allocator, pageSize, numPages);
         IFileMapProvider fileMapProvider = getFileMapProvider();
         bufferCache = new BufferCache(ioManager, prs, new 
DelayPageCleanerPolicy(1000),
-                (IFileMapManager) fileMapProvider, maxOpenFiles, 
threadFactory);
+                (IFileMapManager) fileMapProvider, maxOpenFiles, 10, 
threadFactory);
         return bufferCache;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index ebfaeb8..e36b655 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -76,7 +76,7 @@
     private static IOManager createIoManager() throws HyracksException {
         List<IODeviceHandle> devices = new ArrayList<>();
         devices.add(new IODeviceHandle(new 
File(System.getProperty("java.io.tmpdir")), "."));
-        return new IOManager(devices, new DefaultDeviceResolver());
+        return new IOManager(devices, new DefaultDeviceResolver(), 2, 10);
     }
 
     public static void compareWithResult(File expectedFile, File actualFile) 
throws Exception {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
index 22456e8..5d10603 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
@@ -256,7 +256,7 @@
             String iodevPath = System.getProperty("java.io.tmpdir") + sep + 
"test_iodev" + i;
             devices.add(new IODeviceHandle(new File(iodevPath), "wa"));
         }
-        return new IOManager(devices, new DefaultDeviceResolver());
+        return new IOManager(devices, new DefaultDeviceResolver(), 2, 10);
     }
 
     private FileReference simulateMerge(ILSMIndexFileManager fileManager, 
FileReference a, FileReference b)
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java
index e2a875b..b0ecc1a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java
@@ -38,8 +38,8 @@
     public void testPrefixNames() throws HyracksDataException {
         IODeviceHandle shorter = new IODeviceHandle(new File("/tmp/tst/1"), 
"storage");
         IODeviceHandle longer = new IODeviceHandle(new File("/tmp/tst/11"), 
"storage");
-        IOManager ioManager =
-                new IOManager(Arrays.asList(new IODeviceHandle[] { shorter, 
longer }), new DefaultDeviceResolver());
+        IOManager ioManager = new IOManager(Arrays.asList(new IODeviceHandle[] 
{ shorter, longer }),
+                new DefaultDeviceResolver(), 2, 10);
         FileReference f = 
ioManager.resolveAbsolutePath("/tmp/tst/11/storage/Foo_idx_foo/my_btree");
         Assert.assertEquals("/tmp/tst/11/storage/Foo_idx_foo/my_btree", 
f.getAbsolutePath());
     }
@@ -48,8 +48,8 @@
     public void testDuplicates() throws HyracksDataException {
         IODeviceHandle first = new IODeviceHandle(new File("/tmp/tst/1"), 
"storage");
         IODeviceHandle second = new IODeviceHandle(new File("/tmp/tst/1"), 
"storage");
-        IOManager ioManager =
-                new IOManager(Arrays.asList(new IODeviceHandle[] { first, 
second }), new DefaultDeviceResolver());
+        IOManager ioManager = new IOManager(Arrays.asList(new IODeviceHandle[] 
{ first, second }),
+                new DefaultDeviceResolver(), 2, 19);
     }
 
     @After

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3133
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I1c8ad11c2b8b983ef4bf7cf78c2f068accddfff4
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Ian Maxon <ima...@apache.org>

Reply via email to