This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 93deb1a709840c2ae8957ea88cd8ffc0929951ad Author: Stephan Ewen <se...@apache.org> AuthorDate: Mon Dec 9 20:22:08 2019 +0100 [FLINK-14926][state-backend-rocksdb] (follow-up) Simplify test to rely on RocksDBResourceContainer for cleanup of native handles --- .../contrib/streaming/state/RocksDBResourceContainer.java | 6 ++++++ .../flink/contrib/streaming/state/RocksDBStateBackend.java | 12 +----------- .../contrib/streaming/state/RocksDBStateBackendTest.java | 12 ++++-------- 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index 32c198f..20c3c5c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -106,6 +106,12 @@ final class RocksDBResourceContainer implements AutoCloseable { return opt; } + RocksDBNativeMetricOptions getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) { + return optionsFactory == null + ? defaultMetricOptions + : optionsFactory.createNativeMetricsOptions(defaultMetricOptions); + } + @Nullable OptionsFactory getOptionsFactory() { return optionsFactory; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 829566d..3ffb021 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -560,7 +560,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu .setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled()) .setEnableTtlCompactionFilter(isTtlCompactionFilterEnabled()) .setNumberOfTransferingThreads(getNumberOfTransferThreads()) - .setNativeMetricOptions(getMemoryWatcherOptions(resourceContainer)); + .setNativeMetricOptions(resourceContainer.getMemoryWatcherOptions(defaultMetricOptions)); return builder.build(); } @@ -853,16 +853,6 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu return resourceContainer.getColumnOptions(); } - public RocksDBNativeMetricOptions getMemoryWatcherOptions(RocksDBResourceContainer resourceContainer) { - RocksDBNativeMetricOptions options = this.defaultMetricOptions; - OptionsFactory optionsFactory = resourceContainer.getOptionsFactory(); - if (optionsFactory != null) { - options = optionsFactory.createNativeMetricsOptions(options); - } - - return options; - } - /** * Gets the number of threads used to transfer files while snapshotting/restoring. */ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index e59f6e1..b31e190 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -115,14 +115,12 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa private String dbPath; private RocksDB db = null; private ColumnFamilyHandle defaultCFHandle = null; - private ColumnFamilyOptions columnOptions = null; - private RocksDBResourceContainer optionsContainer = null; - private ArrayList<AutoCloseable> handlesToClose = new ArrayList<>(); + private final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer(); public void prepareRocksDB() throws Exception { String dbPath = new File(tempFolder.newFolder(), DB_INSTANCE_DIR_STRING).getAbsolutePath(); - columnOptions = PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose); - optionsContainer = new RocksDBResourceContainer(); + ColumnFamilyOptions columnOptions = optionsContainer.getColumnOptions(); + ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); db = RocksDBOperationUtils.openDB(dbPath, Collections.emptyList(), columnFamilyHandles, columnOptions, optionsContainer.getDbOptions()); @@ -157,9 +155,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa } IOUtils.closeQuietly(defaultCFHandle); IOUtils.closeQuietly(db); - IOUtils.closeQuietly(columnOptions); IOUtils.closeQuietly(optionsContainer); - handlesToClose.forEach(IOUtils::closeQuietly); if (allCreatedCloseables != null) { for (RocksObject rocksCloseable : allCreatedCloseables) { @@ -185,7 +181,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa IntSerializer.INSTANCE, spy(db), defaultCFHandle, - PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose)) + optionsContainer.getColumnOptions()) .setEnableIncrementalCheckpointing(enableIncrementalCheckpointing) .build();