Ian Maxon has submitted this change and it was merged. ( https://asterix-gerrit.ics.uci.edu/3133 )
Change subject: [NO ISSUE] Make IOManager more configurable ...................................................................... [NO ISSUE] Make IOManager more configurable Change-Id: I1c8ad11c2b8b983ef4bf7cf78c2f068accddfff4 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3133 Contrib: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Contrib: Michael Blow <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java M hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java M hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java 11 files changed, 42 insertions(+), 32 deletions(-) Approvals: Jenkins: Verified; ; Verified Michael Blow: Looks good to me, approved; Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index e663b49..8b8f5a0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -89,6 +89,7 @@ import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.network.INetworkSecurityManager; +import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.ipc.impl.HyracksConnection; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; @@ -185,6 +186,7 @@ public void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory, boolean initialRun) throws IOException { ioManager = getServiceContext().getIoManager(); + int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE); threadExecutor = MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory()); ICacheMemoryAllocator allocator = new HeapBufferAllocator(); @@ -239,11 +241,11 @@ replicationChannel = new ReplicationChannel(this); bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(), - storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory(), + storageProperties.getBufferCacheMaxOpenFiles(), ioQueueLen, getServiceContext().getThreadFactory(), replicationManager); } else { bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(), - storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory()); + storageProperties.getBufferCacheMaxOpenFiles(), ioQueueLen, getServiceContext().getThreadFactory()); } /* diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index 3619cbb..dd92798 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -88,7 +88,9 @@ TRACE_CATEGORIES(STRING_ARRAY, new String[0]), KEY_STORE_PATH(STRING, (String) null), TRUST_STORE_PATH(STRING, (String) null), - KEY_STORE_PASSWORD(STRING, (String) null); + KEY_STORE_PASSWORD(STRING, (String) null), + IO_WORKERS_PER_PARTITION(POSITIVE_INTEGER, 2), + IO_QUEUE_SIZE(POSITIVE_INTEGER, 10); private final IOptionType parser; private final String defaultValueDescription; @@ -217,8 +219,12 @@ return "A fully-qualified path to a trust store file that will be used for secured connections"; case KEY_STORE_PASSWORD: return "The password to the provided key store"; + case IO_WORKERS_PER_PARTITION: + return "Number of threads per partition used to write and read from storage"; + case IO_QUEUE_SIZE: + return "Length of the queue used for requests to write and read"; default: - throw new IllegalStateException("NYI: " + this); + throw new IllegalStateException("Not yet implemented: " + this); } } @@ -575,4 +581,12 @@ public void setTrustStorePath(String keyStorePath) { configManager.set(nodeId, Option.TRUST_STORE_PATH, keyStorePath); } + + public int getIOParallelism() { + return appConfig.getInt(Option.IO_WORKERS_PER_PARTITION); + } + + public int getIOQueueSize() { + return appConfig.getInt(Option.IO_QUEUE_SIZE); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 317d59a..517169b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -214,8 +214,8 @@ ncShutdownHook = new NCShutdownHook(this); Runtime.getRuntime().addShutdownHook(ncShutdownHook); Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager()); - ioManager = - new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver()); + ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), + application.getFileDeviceResolver(), ncConfig.getIOParallelism(), ncConfig.getIOQueueSize()); try { workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. jobletMap = new ConcurrentHashMap<>(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java index b5cb21a..14404d2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java @@ -56,7 +56,6 @@ /* * Constants */ - public static final int IO_REQUEST_QUEUE_SIZE = 100; // TODO: Make configurable private static final Logger LOGGER = LogManager.getLogger(); private static final String WORKSPACE_FILE_SUFFIX = ".waf"; private static final FilenameFilter WORKSPACE_FILES_FILTER = (dir, name) -> name.endsWith(WORKSPACE_FILE_SUFFIX); @@ -74,7 +73,8 @@ private int workspaceIndex; private final IFileDeviceResolver deviceComputer; - public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer) throws HyracksDataException { + public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer, int ioParallelism, int queueSize) + throws HyracksDataException { this.ioDevices = Collections.unmodifiableList(devices); checkDeviceValidity(devices); workspaces = new ArrayList<>(); @@ -93,9 +93,9 @@ } workspaceIndex = 0; this.deviceComputer = deviceComputer; - submittedRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE); - freeRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE); - int numIoThreads = ioDevices.size() * 2; + submittedRequests = new ArrayBlockingQueue<>(queueSize); + freeRequests = new ArrayBlockingQueue<>(queueSize); + int numIoThreads = ioDevices.size() * ioParallelism; executor = Executors.newFixedThreadPool(numIoThreads); for (int i = 0; i < numIoThreads; i++) { executor.execute(new IoRequestHandler(i, submittedRequests)); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java index d107ff2..7ba81e1 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java @@ -60,7 +60,7 @@ ICacheMemoryAllocator allocator = new HeapBufferAllocator(); IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, 32768, 50); bufferCache = new BufferCache(appCtx.getIoManager(), prs, new DelayPageCleanerPolicy(1000), fileMapManager, 100, - threadFactory); + 10, threadFactory); ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory(); localResourceRepository = localResourceRepositoryFactory.createRepository(); resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml index 423925b..31037ff 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml @@ -50,11 +50,6 @@ </dependency> <dependency> <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-control-nc</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-util</artifactId> <version>${project.version}</version> </dependency> diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index b40f252..c4c5d8a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -18,8 +18,6 @@ */ package org.apache.hyracks.storage.common.buffercache; -import static org.apache.hyracks.control.nc.io.IOManager.IO_REQUEST_QUEUE_SIZE; - import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -76,8 +74,7 @@ private final CleanerThread cleanerThread; private final Map<Integer, BufferedFileHandle> fileInfoMap; private final AsyncFIFOPageQueueManager fifoWriter; - private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache = - new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE); + private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache; private IIOReplicationManager ioReplicationManager; private final List<ICachedPageInternal> cachedPages = new ArrayList<>(); @@ -93,8 +90,9 @@ private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner; public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy, - IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, + IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, int ioQueuelen, ThreadFactory threadFactory) { + this.headerPageCache = new ArrayBlockingQueue<>(ioQueuelen); this.ioManager = ioManager; this.pageSize = pageReplacementStrategy.getPageSize(); this.maxOpenFiles = maxOpenFiles; @@ -124,10 +122,11 @@ //this constructor is used when replication is enabled to pass the IIOReplicationManager public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy, - IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, + IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, int ioQueueLen, ThreadFactory threadFactory, IIOReplicationManager ioReplicationManager) { - this(ioManager, pageReplacementStrategy, pageCleanerPolicy, fileMapManager, maxOpenFiles, threadFactory); + this(ioManager, pageReplacementStrategy, pageCleanerPolicy, fileMapManager, maxOpenFiles, ioQueueLen, + threadFactory); this.ioReplicationManager = ioReplicationManager; } diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java index 7b7e850..a2da285 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java @@ -107,7 +107,7 @@ List<IODeviceHandle> devices = new ArrayList<>(); devices.add(new IODeviceHandle(new File(System.getProperty("user.dir") + File.separator + "target"), "iodev_test_wa")); - ioManager = new IOManager(devices, new DefaultDeviceResolver()); + ioManager = new IOManager(devices, new DefaultDeviceResolver(), 2, 10); } return ioManager; } @@ -152,7 +152,7 @@ IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, pageSize, numPages); IFileMapProvider fileMapProvider = getFileMapProvider(); bufferCache = new BufferCache(ioManager, prs, new DelayPageCleanerPolicy(1000), - (IFileMapManager) fileMapProvider, maxOpenFiles, threadFactory); + (IFileMapManager) fileMapProvider, maxOpenFiles, 10, threadFactory); return bufferCache; } } diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java index ebfaeb8..e36b655 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java @@ -76,7 +76,7 @@ private static IOManager createIoManager() throws HyracksException { List<IODeviceHandle> devices = new ArrayList<>(); devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), ".")); - return new IOManager(devices, new DefaultDeviceResolver()); + return new IOManager(devices, new DefaultDeviceResolver(), 2, 10); } public static void compareWithResult(File expectedFile, File actualFile) throws Exception { diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java index 22456e8..5d10603 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java @@ -256,7 +256,7 @@ String iodevPath = System.getProperty("java.io.tmpdir") + sep + "test_iodev" + i; devices.add(new IODeviceHandle(new File(iodevPath), "wa")); } - return new IOManager(devices, new DefaultDeviceResolver()); + return new IOManager(devices, new DefaultDeviceResolver(), 2, 10); } private FileReference simulateMerge(ILSMIndexFileManager fileManager, FileReference a, FileReference b) diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java index e2a875b..b0ecc1a 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java @@ -38,8 +38,8 @@ public void testPrefixNames() throws HyracksDataException { IODeviceHandle shorter = new IODeviceHandle(new File("/tmp/tst/1"), "storage"); IODeviceHandle longer = new IODeviceHandle(new File("/tmp/tst/11"), "storage"); - IOManager ioManager = - new IOManager(Arrays.asList(new IODeviceHandle[] { shorter, longer }), new DefaultDeviceResolver()); + IOManager ioManager = new IOManager(Arrays.asList(new IODeviceHandle[] { shorter, longer }), + new DefaultDeviceResolver(), 2, 10); FileReference f = ioManager.resolveAbsolutePath("/tmp/tst/11/storage/Foo_idx_foo/my_btree"); Assert.assertEquals("/tmp/tst/11/storage/Foo_idx_foo/my_btree", f.getAbsolutePath()); } @@ -48,8 +48,8 @@ public void testDuplicates() throws HyracksDataException { IODeviceHandle first = new IODeviceHandle(new File("/tmp/tst/1"), "storage"); IODeviceHandle second = new IODeviceHandle(new File("/tmp/tst/1"), "storage"); - IOManager ioManager = - new IOManager(Arrays.asList(new IODeviceHandle[] { first, second }), new DefaultDeviceResolver()); + IOManager ioManager = new IOManager(Arrays.asList(new IODeviceHandle[] { first, second }), + new DefaultDeviceResolver(), 2, 19); } @After -- To view, visit https://asterix-gerrit.ics.uci.edu/3133 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: merged Gerrit-Change-Id: I1c8ad11c2b8b983ef4bf7cf78c2f068accddfff4 Gerrit-Change-Number: 3133 Gerrit-PatchSet: 9 Gerrit-Owner: Ian Maxon <[email protected]> Gerrit-Reviewer: Anon. E. Moose (1000171) Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Luo Chen <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Wail Alkowaileet <[email protected]>
