LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1382327255
########## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java: ########## @@ -355,26 +355,23 @@ public void close() throws IOException { } } - /** - * Closes the given iterator if the DB is still open. Trying to close a JNI RocksDB handle - * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. - */ - void closeIterator(RocksDBIterator<?> it) throws IOException { - notifyIteratorClosed(it); - synchronized (this._db) { - org.rocksdb.RocksDB _db = this._db.get(); - if (_db != null) { - it.close(); - } - } + AtomicReference<org.rocksdb.RocksDB> getRocksDB() { Review Comment: Some comments need to be added to explain the intent and scope of use of this method. ########## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java: ########## @@ -355,26 +355,23 @@ public void close() throws IOException { } } - /** - * Closes the given iterator if the DB is still open. Trying to close a JNI RocksDB handle - * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. - */ - void closeIterator(RocksDBIterator<?> it) throws IOException { - notifyIteratorClosed(it); - synchronized (this._db) { - org.rocksdb.RocksDB _db = this._db.get(); - if (_db != null) { - it.close(); - } - } + AtomicReference<org.rocksdb.RocksDB> getRocksDB() { + return _db; + } + + ConcurrentLinkedQueue<Reference<RocksDBIterator<?>>> getIteratorTracker() { Review Comment: ditto ########## common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java: ########## @@ -381,6 +382,55 @@ public void testSkipAfterDBClose() throws Exception { assertFalse(iter.skip(1)); } + @Test + public void testResourceCleaner() throws Exception { + File dbPathForCleanerTest = File.createTempFile( + "test_db_cleaner.", ".rdb"); + dbPathForCleanerTest.delete(); + + RocksDB dbForCleanerTest = new RocksDB(dbPathForCleanerTest); + try { + for (int i = 0; i < 8192; i++) { + dbForCleanerTest.write(createCustomType1(i)); + } + RocksDBIterator<CustomType1> rocksDBIterator = + (RocksDBIterator<CustomType1>) dbForCleanerTest.view(CustomType1.class).iterator(); + Reference<RocksDBIterator<?>> reference = Review Comment: In this case, since we didn't manually close the `RocksDBIterator`, we can ultimately determine whether the `RocksDBIterator` is closed through `resourceCleaner.getStatus()`. So here, `reference` is just an auxiliary tool to judge whether the `RocksDBIterator` has been GCed. So can we change it to manually wrap a `WeakReference` instead of getting it from the `iteratorTracker`, so that we don't need to add an `Accessor` for the `iteratorTracker`? ########## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ########## @@ -272,4 +287,37 @@ static int compare(byte[] a, byte[] b) { return a.length - b.length; } + static class ResourceCleaner implements Runnable { + + private final RocksIterator rocksIterator; + private final RocksDB rocksDB; + private final AtomicBoolean status = new AtomicBoolean(true); + + ResourceCleaner(RocksIterator rocksIterator, RocksDB rocksDB) { + this.rocksIterator = rocksIterator; + this.rocksDB = rocksDB; + } + + @Override + public void run() { + if (status.compareAndSet(true, false)) { + rocksDB.notifyIteratorClosed(rocksIterator); + synchronized (rocksDB.getRocksDB()) { + org.rocksdb.RocksDB _db = rocksDB.getRocksDB().get(); + if (_db != null) { + rocksIterator.close(); + } + } + } + } + + void statusToFalse() { + status.set(false); + } + + @VisibleForTesting + AtomicBoolean getStatus() { Review Comment: I suggest that ```java boolean getStatus() { return status.get(); } ``` ########## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ########## @@ -272,4 +287,37 @@ static int compare(byte[] a, byte[] b) { return a.length - b.length; } + static class ResourceCleaner implements Runnable { + + private final RocksIterator rocksIterator; + private final RocksDB rocksDB; + private final AtomicBoolean status = new AtomicBoolean(true); Review Comment: Is there a more suitable variable name? ########## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ########## @@ -176,22 +183,30 @@ public boolean skip(long n) { @Override public synchronized void close() throws IOException { - db.notifyIteratorClosed(this); + db.notifyIteratorClosed(it); if (!closed) { - it.close(); - closed = true; - next = null; + try { + it.close(); + closed = true; + next = null; + } finally { + cancelResourceClean(); + } } } - /** - * Because it's tricky to expose closeable iterators through many internal APIs, especially - * when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by - * the iterator will eventually be released. - */ - @Override - protected void finalize() throws Throwable { - db.closeIterator(this); + private void cancelResourceClean() { + this.resourceCleaner.statusToFalse(); + this.cleanable.clean(); + } + + @VisibleForTesting + ResourceCleaner getResourceCleaner() { + return resourceCleaner; + } + + public RocksIterator getRocksIterator() { Review Comment: can we remove the `public` and I like `internalIterator`? ########## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ########## @@ -272,4 +287,37 @@ static int compare(byte[] a, byte[] b) { return a.length - b.length; } + static class ResourceCleaner implements Runnable { + + private final RocksIterator rocksIterator; + private final RocksDB rocksDB; + private final AtomicBoolean status = new AtomicBoolean(true); + + ResourceCleaner(RocksIterator rocksIterator, RocksDB rocksDB) { + this.rocksIterator = rocksIterator; + this.rocksDB = rocksDB; + } + + @Override + public void run() { + if (status.compareAndSet(true, false)) { + rocksDB.notifyIteratorClosed(rocksIterator); + synchronized (rocksDB.getRocksDB()) { + org.rocksdb.RocksDB _db = rocksDB.getRocksDB().get(); + if (_db != null) { + rocksIterator.close(); + } + } + } + } + + void statusToFalse() { Review Comment: Is there a more suitable method name? ########## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ########## @@ -176,22 +183,30 @@ public boolean skip(long n) { @Override public synchronized void close() throws IOException { - db.notifyIteratorClosed(this); + db.notifyIteratorClosed(it); if (!closed) { - it.close(); - closed = true; - next = null; + try { + it.close(); + closed = true; + next = null; + } finally { + cancelResourceClean(); + } } } - /** - * Because it's tricky to expose closeable iterators through many internal APIs, especially - * when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by - * the iterator will eventually be released. - */ - @Override - protected void finalize() throws Throwable { - db.closeIterator(this); + private void cancelResourceClean() { Review Comment: Method comments need to be added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org