This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new b3eb7a4 [NO ISSUE] Make IOManager more configurable b3eb7a4 is described below commit b3eb7a4e8404cc0c822d3d06dc059368c5633801 Author: Ian Maxon <ima...@apache.org> AuthorDate: Wed Apr 17 16:42:55 2019 -0700 [NO ISSUE] Make IOManager more configurable Change-Id: I1c8ad11c2b8b983ef4bf7cf78c2f068accddfff4 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3133 Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> Contrib: Michael Blow <mb...@apache.org> --- .../org/apache/asterix/app/nc/NCAppRuntimeContext.java | 6 ++++-- .../hyracks/control/common/controllers/NCConfig.java | 18 ++++++++++++++++-- .../hyracks/control/nc/NodeControllerService.java | 4 ++-- .../org/apache/hyracks/control/nc/io/IOManager.java | 10 +++++----- .../hyracks/examples/btree/helper/RuntimeContext.java | 2 +- .../hyracks/hyracks-storage-common/pom.xml | 5 ----- .../storage/common/buffercache/BufferCache.java | 13 ++++++------- .../support/TestStorageManagerComponentHolder.java | 4 ++-- .../org/apache/hyracks/test/support/TestUtils.java | 2 +- .../am/lsm/common/test/LSMIndexFileManagerTest.java | 2 +- .../hyracks/storage/common/IOManagerPathTest.java | 8 ++++---- 11 files changed, 42 insertions(+), 32 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index e663b49..8b8f5a0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -89,6 +89,7 @@ import org.apache.hyracks.api.io.IPersistedResourceRegistry; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.network.INetworkSecurityManager; +import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.ipc.impl.HyracksConnection; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; @@ -185,6 +186,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { public void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory, boolean initialRun) throws IOException { ioManager = getServiceContext().getIoManager(); + int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE); threadExecutor = MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory()); ICacheMemoryAllocator allocator = new HeapBufferAllocator(); @@ -239,11 +241,11 @@ public class NCAppRuntimeContext implements INcApplicationContext { replicationChannel = new ReplicationChannel(this); bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(), - storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory(), + storageProperties.getBufferCacheMaxOpenFiles(), ioQueueLen, getServiceContext().getThreadFactory(), replicationManager); } else { bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(), - storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory()); + storageProperties.getBufferCacheMaxOpenFiles(), ioQueueLen, getServiceContext().getThreadFactory()); } /* diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index 3619cbb..dd92798 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -88,7 +88,9 @@ public class NCConfig extends ControllerConfig { TRACE_CATEGORIES(STRING_ARRAY, new String[0]), KEY_STORE_PATH(STRING, (String) null), TRUST_STORE_PATH(STRING, (String) null), - KEY_STORE_PASSWORD(STRING, (String) null); + KEY_STORE_PASSWORD(STRING, (String) null), + IO_WORKERS_PER_PARTITION(POSITIVE_INTEGER, 2), + IO_QUEUE_SIZE(POSITIVE_INTEGER, 10); private final IOptionType parser; private final String defaultValueDescription; @@ -217,8 +219,12 @@ public class NCConfig extends ControllerConfig { return "A fully-qualified path to a trust store file that will be used for secured connections"; case KEY_STORE_PASSWORD: return "The password to the provided key store"; + case IO_WORKERS_PER_PARTITION: + return "Number of threads per partition used to write and read from storage"; + case IO_QUEUE_SIZE: + return "Length of the queue used for requests to write and read"; default: - throw new IllegalStateException("NYI: " + this); + throw new IllegalStateException("Not yet implemented: " + this); } } @@ -575,4 +581,12 @@ public class NCConfig extends ControllerConfig { public void setTrustStorePath(String keyStorePath) { configManager.set(nodeId, Option.TRUST_STORE_PATH, keyStorePath); } + + public int getIOParallelism() { + return appConfig.getInt(Option.IO_WORKERS_PER_PARTITION); + } + + public int getIOQueueSize() { + return appConfig.getInt(Option.IO_QUEUE_SIZE); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 317d59a..517169b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -214,8 +214,8 @@ public class NodeControllerService implements IControllerService { ncShutdownHook = new NCShutdownHook(this); Runtime.getRuntime().addShutdownHook(ncShutdownHook); Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager()); - ioManager = - new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver()); + ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), + application.getFileDeviceResolver(), ncConfig.getIOParallelism(), ncConfig.getIOQueueSize()); try { workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. jobletMap = new ConcurrentHashMap<>(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java index b5cb21a..14404d2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java @@ -56,7 +56,6 @@ public class IOManager implements IIOManager { /* * Constants */ - public static final int IO_REQUEST_QUEUE_SIZE = 100; // TODO: Make configurable private static final Logger LOGGER = LogManager.getLogger(); private static final String WORKSPACE_FILE_SUFFIX = ".waf"; private static final FilenameFilter WORKSPACE_FILES_FILTER = (dir, name) -> name.endsWith(WORKSPACE_FILE_SUFFIX); @@ -74,7 +73,8 @@ public class IOManager implements IIOManager { private int workspaceIndex; private final IFileDeviceResolver deviceComputer; - public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer) throws HyracksDataException { + public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer, int ioParallelism, int queueSize) + throws HyracksDataException { this.ioDevices = Collections.unmodifiableList(devices); checkDeviceValidity(devices); workspaces = new ArrayList<>(); @@ -93,9 +93,9 @@ public class IOManager implements IIOManager { } workspaceIndex = 0; this.deviceComputer = deviceComputer; - submittedRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE); - freeRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE); - int numIoThreads = ioDevices.size() * 2; + submittedRequests = new ArrayBlockingQueue<>(queueSize); + freeRequests = new ArrayBlockingQueue<>(queueSize); + int numIoThreads = ioDevices.size() * ioParallelism; executor = Executors.newFixedThreadPool(numIoThreads); for (int i = 0; i < numIoThreads; i++) { executor.execute(new IoRequestHandler(i, submittedRequests)); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java index d107ff2..7ba81e1 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java @@ -60,7 +60,7 @@ public class RuntimeContext { ICacheMemoryAllocator allocator = new HeapBufferAllocator(); IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, 32768, 50); bufferCache = new BufferCache(appCtx.getIoManager(), prs, new DelayPageCleanerPolicy(1000), fileMapManager, 100, - threadFactory); + 10, threadFactory); ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory(); localResourceRepository = localResourceRepositoryFactory.createRepository(); resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml index 423925b..31037ff 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml @@ -50,11 +50,6 @@ </dependency> <dependency> <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-control-nc</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-util</artifactId> <version>${project.version}</version> </dependency> diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index b40f252..c4c5d8a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -18,8 +18,6 @@ */ package org.apache.hyracks.storage.common.buffercache; -import static org.apache.hyracks.control.nc.io.IOManager.IO_REQUEST_QUEUE_SIZE; - import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -76,8 +74,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { private final CleanerThread cleanerThread; private final Map<Integer, BufferedFileHandle> fileInfoMap; private final AsyncFIFOPageQueueManager fifoWriter; - private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache = - new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE); + private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache; private IIOReplicationManager ioReplicationManager; private final List<ICachedPageInternal> cachedPages = new ArrayList<>(); @@ -93,8 +90,9 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner; public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy, - IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, + IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, int ioQueuelen, ThreadFactory threadFactory) { + this.headerPageCache = new ArrayBlockingQueue<>(ioQueuelen); this.ioManager = ioManager; this.pageSize = pageReplacementStrategy.getPageSize(); this.maxOpenFiles = maxOpenFiles; @@ -124,10 +122,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { //this constructor is used when replication is enabled to pass the IIOReplicationManager public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy, - IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, + IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, int ioQueueLen, ThreadFactory threadFactory, IIOReplicationManager ioReplicationManager) { - this(ioManager, pageReplacementStrategy, pageCleanerPolicy, fileMapManager, maxOpenFiles, threadFactory); + this(ioManager, pageReplacementStrategy, pageCleanerPolicy, fileMapManager, maxOpenFiles, ioQueueLen, + threadFactory); this.ioReplicationManager = ioReplicationManager; } diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java index 7b7e850..a2da285 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java @@ -107,7 +107,7 @@ public class TestStorageManagerComponentHolder { List<IODeviceHandle> devices = new ArrayList<>(); devices.add(new IODeviceHandle(new File(System.getProperty("user.dir") + File.separator + "target"), "iodev_test_wa")); - ioManager = new IOManager(devices, new DefaultDeviceResolver()); + ioManager = new IOManager(devices, new DefaultDeviceResolver(), 2, 10); } return ioManager; } @@ -152,7 +152,7 @@ public class TestStorageManagerComponentHolder { IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, pageSize, numPages); IFileMapProvider fileMapProvider = getFileMapProvider(); bufferCache = new BufferCache(ioManager, prs, new DelayPageCleanerPolicy(1000), - (IFileMapManager) fileMapProvider, maxOpenFiles, threadFactory); + (IFileMapManager) fileMapProvider, maxOpenFiles, 10, threadFactory); return bufferCache; } } diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java index ebfaeb8..e36b655 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java @@ -76,7 +76,7 @@ public class TestUtils { private static IOManager createIoManager() throws HyracksException { List<IODeviceHandle> devices = new ArrayList<>(); devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), ".")); - return new IOManager(devices, new DefaultDeviceResolver()); + return new IOManager(devices, new DefaultDeviceResolver(), 2, 10); } public static void compareWithResult(File expectedFile, File actualFile) throws Exception { diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java index 22456e8..5d10603 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java @@ -256,7 +256,7 @@ public class LSMIndexFileManagerTest { String iodevPath = System.getProperty("java.io.tmpdir") + sep + "test_iodev" + i; devices.add(new IODeviceHandle(new File(iodevPath), "wa")); } - return new IOManager(devices, new DefaultDeviceResolver()); + return new IOManager(devices, new DefaultDeviceResolver(), 2, 10); } private FileReference simulateMerge(ILSMIndexFileManager fileManager, FileReference a, FileReference b) diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java index e2a875b..b0ecc1a 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java @@ -38,8 +38,8 @@ public class IOManagerPathTest { public void testPrefixNames() throws HyracksDataException { IODeviceHandle shorter = new IODeviceHandle(new File("/tmp/tst/1"), "storage"); IODeviceHandle longer = new IODeviceHandle(new File("/tmp/tst/11"), "storage"); - IOManager ioManager = - new IOManager(Arrays.asList(new IODeviceHandle[] { shorter, longer }), new DefaultDeviceResolver()); + IOManager ioManager = new IOManager(Arrays.asList(new IODeviceHandle[] { shorter, longer }), + new DefaultDeviceResolver(), 2, 10); FileReference f = ioManager.resolveAbsolutePath("/tmp/tst/11/storage/Foo_idx_foo/my_btree"); Assert.assertEquals("/tmp/tst/11/storage/Foo_idx_foo/my_btree", f.getAbsolutePath()); } @@ -48,8 +48,8 @@ public class IOManagerPathTest { public void testDuplicates() throws HyracksDataException { IODeviceHandle first = new IODeviceHandle(new File("/tmp/tst/1"), "storage"); IODeviceHandle second = new IODeviceHandle(new File("/tmp/tst/1"), "storage"); - IOManager ioManager = - new IOManager(Arrays.asList(new IODeviceHandle[] { first, second }), new DefaultDeviceResolver()); + IOManager ioManager = new IOManager(Arrays.asList(new IODeviceHandle[] { first, second }), + new DefaultDeviceResolver(), 2, 19); } @After