This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 45ba7ee87ca [FLINK-32326][state] Disable WAL in 
RocksDBWriteBatchWrapper by default. (#22771)
45ba7ee87ca is described below

commit 45ba7ee87caee63a0babfd421b7c5eabaa779baa
Author: Stefan Richter <srich...@apache.org>
AuthorDate: Wed Jun 14 19:23:48 2023 +0200

    [FLINK-32326][state] Disable WAL in RocksDBWriteBatchWrapper by default. 
(#22771)
    
    * [FLINK-32326][state] Disable WAL in RocksDBWriteBatchWrapper by default.
    
    Disables WAL by default in RocksDBWriteBatchWrapper for the case that now 
explicit WriteOption is passed in. This is the case in all restore operations 
and can impact the performance.
    
    * [hotfix][state] Replace deprecated API call WriteBatch::remove with 
WriteBatch::delete.
---
 .../streaming/state/RocksDBWriteBatchWrapper.java  | 40 ++++++++++++++--------
 .../state/RocksDBWriteBatchWrapperTest.java        | 37 ++++++++++++++++++++
 2 files changed, 63 insertions(+), 14 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
index 3906c74972c..354009e335c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
@@ -32,6 +32,9 @@ import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * It's a wrapper class around RocksDB's {@link WriteBatch} for writing in 
bulk.
  *
@@ -55,6 +58,9 @@ public class RocksDBWriteBatchWrapper implements 
AutoCloseable {
 
     @Nonnegative private final long batchSize;
 
+    /** List of all objects that we need to close in close(). */
+    private final List<AutoCloseable> toClose;
+
     public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, long 
writeBatchSize) {
         this(rocksDB, null, 500, writeBatchSize);
     }
@@ -79,9 +85,9 @@ public class RocksDBWriteBatchWrapper implements 
AutoCloseable {
         Preconditions.checkArgument(batchSize >= 0, "Max batch size have to be 
no negative.");
 
         this.db = rocksDB;
-        this.options = options;
         this.capacity = capacity;
         this.batchSize = batchSize;
+        this.toClose = new ArrayList<>(2);
         if (this.batchSize > 0) {
             this.batch =
                     new WriteBatch(
@@ -89,6 +95,15 @@ public class RocksDBWriteBatchWrapper implements 
AutoCloseable {
         } else {
             this.batch = new WriteBatch(this.capacity * PER_RECORD_BYTES);
         }
+        this.toClose.add(this.batch);
+        if (options != null) {
+            this.options = options;
+        } else {
+            // Use default write options with disabled WAL
+            this.options = new WriteOptions().setDisableWAL(true);
+            // We own this object, so we must ensure that we close it.
+            this.toClose.add(this.options);
+        }
     }
 
     public void put(@Nonnull ColumnFamilyHandle handle, @Nonnull byte[] key, 
@Nonnull byte[] value)
@@ -102,33 +117,30 @@ public class RocksDBWriteBatchWrapper implements 
AutoCloseable {
     public void remove(@Nonnull ColumnFamilyHandle handle, @Nonnull byte[] key)
             throws RocksDBException {
 
-        batch.remove(handle, key);
+        batch.delete(handle, key);
 
         flushIfNeeded();
     }
 
     public void flush() throws RocksDBException {
-        if (options != null) {
-            db.write(options, batch);
-        } else {
-            // use the default WriteOptions, if wasn't provided.
-            try (WriteOptions writeOptions = new WriteOptions()) {
-                db.write(writeOptions, batch);
-            }
-        }
+        db.write(options, batch);
         batch.clear();
     }
 
-    public WriteOptions getOptions() {
+    @VisibleForTesting
+    WriteOptions getOptions() {
         return options;
     }
 
     @Override
     public void close() throws RocksDBException {
-        if (batch.count() != 0) {
-            flush();
+        try {
+            if (batch.count() != 0) {
+                flush();
+            }
+        } finally {
+            IOUtils.closeAllQuietly(toClose);
         }
-        IOUtils.closeQuietly(batch);
     }
 
     private void flushIfNeeded() throws RocksDBException {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
index c7b725fe73c..6a7c95af067 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
@@ -35,6 +35,8 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /** Tests to guard {@link RocksDBWriteBatchWrapper}. */
 public class RocksDBWriteBatchWrapperTest {
@@ -123,4 +125,39 @@ public class RocksDBWriteBatchWrapperTest {
             assertEquals(initBatchSize, writeBatchWrapper.getDataSize());
         }
     }
+
+    /**
+     * Test that {@link RocksDBWriteBatchWrapper} creates default {@link 
WriteOptions} with disabled
+     * WAL and closes them correctly.
+     */
+    @Test
+    public void testDefaultWriteOptionsHaveDisabledWAL() throws Exception {
+        WriteOptions options;
+        try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+                RocksDBWriteBatchWrapper writeBatchWrapper =
+                        new RocksDBWriteBatchWrapper(db, null, 200, 50)) {
+            options = writeBatchWrapper.getOptions();
+            assertTrue(options.isOwningHandle());
+            assertTrue(options.disableWAL());
+        }
+        assertFalse(options.isOwningHandle());
+    }
+
+    /**
+     * Test that {@link RocksDBWriteBatchWrapper} respects passed in {@link 
WriteOptions} and does
+     * not close them.
+     */
+    @Test
+    public void testNotClosingPassedInWriteOption() throws Exception {
+        try (WriteOptions passInOption = new 
WriteOptions().setDisableWAL(false)) {
+            try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+                    RocksDBWriteBatchWrapper writeBatchWrapper =
+                            new RocksDBWriteBatchWrapper(db, passInOption, 
200, 50)) {
+                WriteOptions options = writeBatchWrapper.getOptions();
+                assertTrue(options.isOwningHandle());
+                assertFalse(options.disableWAL());
+            }
+            assertTrue(passInOption.isOwningHandle());
+        }
+    }
 }

Reply via email to