LuciferYang commented on code in PR #43502:
URL: https://github.com/apache/spark/pull/43502#discussion_r1382327255


##########
common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java:
##########
@@ -355,26 +355,23 @@ public void close() throws IOException {
     }
   }
 
-  /**
-   * Closes the given iterator if the DB is still open. Trying to close a JNI 
RocksDB handle
-   * with a closed DB can cause JVM crashes, so this ensures that situation 
does not happen.
-   */
-  void closeIterator(RocksDBIterator<?> it) throws IOException {
-    notifyIteratorClosed(it);
-    synchronized (this._db) {
-      org.rocksdb.RocksDB _db = this._db.get();
-      if (_db != null) {
-        it.close();
-      }
-    }
+  AtomicReference<org.rocksdb.RocksDB> getRocksDB() {

Review Comment:
   Some comments need to be added to explain the intent and scope of use of 
this method.



##########
common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java:
##########
@@ -355,26 +355,23 @@ public void close() throws IOException {
     }
   }
 
-  /**
-   * Closes the given iterator if the DB is still open. Trying to close a JNI 
RocksDB handle
-   * with a closed DB can cause JVM crashes, so this ensures that situation 
does not happen.
-   */
-  void closeIterator(RocksDBIterator<?> it) throws IOException {
-    notifyIteratorClosed(it);
-    synchronized (this._db) {
-      org.rocksdb.RocksDB _db = this._db.get();
-      if (_db != null) {
-        it.close();
-      }
-    }
+  AtomicReference<org.rocksdb.RocksDB> getRocksDB() {
+    return _db;
+  }
+
+  ConcurrentLinkedQueue<Reference<RocksDBIterator<?>>> getIteratorTracker() {

Review Comment:
   ditto



##########
common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java:
##########
@@ -381,6 +382,55 @@ public void testSkipAfterDBClose() throws Exception {
     assertFalse(iter.skip(1));
   }
 
+  @Test
+  public void testResourceCleaner() throws Exception {
+    File dbPathForCleanerTest = File.createTempFile(
+      "test_db_cleaner.", ".rdb");
+    dbPathForCleanerTest.delete();
+
+    RocksDB dbForCleanerTest = new RocksDB(dbPathForCleanerTest);
+    try {
+      for (int i = 0; i < 8192; i++) {
+        dbForCleanerTest.write(createCustomType1(i));
+      }
+      RocksDBIterator<CustomType1> rocksDBIterator =
+        (RocksDBIterator<CustomType1>) 
dbForCleanerTest.view(CustomType1.class).iterator();
+      Reference<RocksDBIterator<?>> reference =

Review Comment:
   In this case, since we didn't manually close the `RocksDBIterator`, we can 
ultimately determine whether the `RocksDBIterator` is closed through 
`resourceCleaner.getStatus()`. 
   
   So here, `reference` is just an auxiliary tool to judge whether the 
`RocksDBIterator` has been GCed. So can we change it to manually wrap a 
`WeakReference` instead of getting it from the `iteratorTracker`, so that we 
don't need to add an `Accessor` for the `iteratorTracker`?



##########
common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java:
##########
@@ -272,4 +287,37 @@ static int compare(byte[] a, byte[] b) {
     return a.length - b.length;
   }
 
+  static class ResourceCleaner implements Runnable {
+
+    private final RocksIterator rocksIterator;
+    private final RocksDB rocksDB;
+    private final AtomicBoolean status = new AtomicBoolean(true);
+
+    ResourceCleaner(RocksIterator rocksIterator, RocksDB rocksDB) {
+      this.rocksIterator = rocksIterator;
+      this.rocksDB = rocksDB;
+    }
+
+    @Override
+    public void run() {
+      if (status.compareAndSet(true, false)) {
+        rocksDB.notifyIteratorClosed(rocksIterator);
+        synchronized (rocksDB.getRocksDB()) {
+          org.rocksdb.RocksDB _db = rocksDB.getRocksDB().get();
+          if (_db != null) {
+            rocksIterator.close();
+          }
+        }
+      }
+    }
+
+    void statusToFalse() {
+      status.set(false);
+    }
+
+    @VisibleForTesting
+    AtomicBoolean getStatus() {

Review Comment:
   I suggest that
   
   ```java
   boolean getStatus() {
     return status.get();
    }
   ```



##########
common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java:
##########
@@ -272,4 +287,37 @@ static int compare(byte[] a, byte[] b) {
     return a.length - b.length;
   }
 
+  static class ResourceCleaner implements Runnable {
+
+    private final RocksIterator rocksIterator;
+    private final RocksDB rocksDB;
+    private final AtomicBoolean status = new AtomicBoolean(true);

Review Comment:
   Is there a more suitable variable name?
   
   



##########
common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java:
##########
@@ -176,22 +183,30 @@ public boolean skip(long n) {
 
   @Override
   public synchronized void close() throws IOException {
-    db.notifyIteratorClosed(this);
+    db.notifyIteratorClosed(it);
     if (!closed) {
-      it.close();
-      closed = true;
-      next = null;
+      try {
+        it.close();
+        closed = true;
+        next = null;
+      } finally {
+        cancelResourceClean();
+      }
     }
   }
 
-  /**
-   * Because it's tricky to expose closeable iterators through many internal 
APIs, especially
-   * when Scala wrappers are used, this makes sure that, hopefully, the JNI 
resources held by
-   * the iterator will eventually be released.
-   */
-  @Override
-  protected void finalize() throws Throwable {
-    db.closeIterator(this);
+  private void cancelResourceClean() {
+    this.resourceCleaner.statusToFalse();
+    this.cleanable.clean();
+  }
+
+  @VisibleForTesting
+  ResourceCleaner getResourceCleaner() {
+    return resourceCleaner;
+  }
+
+  public RocksIterator getRocksIterator() {

Review Comment:
   can we remove the `public` and I like `internalIterator`?



##########
common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java:
##########
@@ -272,4 +287,37 @@ static int compare(byte[] a, byte[] b) {
     return a.length - b.length;
   }
 
+  static class ResourceCleaner implements Runnable {
+
+    private final RocksIterator rocksIterator;
+    private final RocksDB rocksDB;
+    private final AtomicBoolean status = new AtomicBoolean(true);
+
+    ResourceCleaner(RocksIterator rocksIterator, RocksDB rocksDB) {
+      this.rocksIterator = rocksIterator;
+      this.rocksDB = rocksDB;
+    }
+
+    @Override
+    public void run() {
+      if (status.compareAndSet(true, false)) {
+        rocksDB.notifyIteratorClosed(rocksIterator);
+        synchronized (rocksDB.getRocksDB()) {
+          org.rocksdb.RocksDB _db = rocksDB.getRocksDB().get();
+          if (_db != null) {
+            rocksIterator.close();
+          }
+        }
+      }
+    }
+
+    void statusToFalse() {

Review Comment:
   Is there a more suitable method name?



##########
common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java:
##########
@@ -176,22 +183,30 @@ public boolean skip(long n) {
 
   @Override
   public synchronized void close() throws IOException {
-    db.notifyIteratorClosed(this);
+    db.notifyIteratorClosed(it);
     if (!closed) {
-      it.close();
-      closed = true;
-      next = null;
+      try {
+        it.close();
+        closed = true;
+        next = null;
+      } finally {
+        cancelResourceClean();
+      }
     }
   }
 
-  /**
-   * Because it's tricky to expose closeable iterators through many internal 
APIs, especially
-   * when Scala wrappers are used, this makes sure that, hopefully, the JNI 
resources held by
-   * the iterator will eventually be released.
-   */
-  @Override
-  protected void finalize() throws Throwable {
-    db.closeIterator(this);
+  private void cancelResourceClean() {

Review Comment:
   Method comments need to be added.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to