This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c15e044d6b [core] Load RocksDB library in advance and catch exceptions
(#6297)
c15e044d6b is described below
commit c15e044d6b856ddddd2cfda9c6212863dae1a08c
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Sep 23 13:36:46 2025 +0800
[core] Load RocksDB library in advance and catch exceptions (#6297)
---
paimon-core/pom.xml | 2 +-
.../paimon/lookup/rocksdb/RocksDBStateFactory.java | 70 +++++++++
.../lookup/rocksdb/RocksDBStateFactoryTest.java | 167 +++++++++++++++++++++
3 files changed, 238 insertions(+), 1 deletion(-)
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 2f2026632e..c2f5103a63 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -242,7 +242,7 @@ under the License.
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
+ <artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<type>jar</type>
<scope>test</scope>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java
index 883c82906b..70a6e10e5f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java
@@ -18,6 +18,7 @@
package org.apache.paimon.lookup.rocksdb;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.lookup.StateFactory;
@@ -25,21 +26,30 @@ import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.TtlDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
/** Factory to create state. */
public class RocksDBStateFactory implements StateFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(RocksDBStateFactory.class);
+
public static final String MERGE_OPERATOR_NAME = "stringappendtest";
+ private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
+
+ private static boolean rocksDbInitialized = false;
private final Options options;
private final String path;
@@ -50,6 +60,11 @@ public class RocksDBStateFactory implements StateFactory {
public RocksDBStateFactory(
String path, org.apache.paimon.options.Options conf, @Nullable
Duration ttlSecs)
throws IOException {
+ try {
+ ensureRocksDBIsLoaded();
+ } catch (Throwable e) {
+ throw new IOException("Could not load the native RocksDB library",
e);
+ }
DBOptions dbOptions =
RocksDBOptions.createDBOptions(
new DBOptions()
@@ -141,4 +156,59 @@ public class RocksDBStateFactory implements StateFactory {
db = null;
}
}
+
+ // ------------------------------------------------------------------------
+ // static library loading utilities
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ static void ensureRocksDBIsLoaded() throws IOException {
+ synchronized (RocksDBStateFactory.class) {
+ if (!rocksDbInitialized) {
+ LOG.info("Attempting to load RocksDB native library");
+
+ Throwable lastException = null;
+ for (int attempt = 1; attempt <= ROCKSDB_LIB_LOADING_ATTEMPTS;
attempt++) {
+ try {
+ // keep same with RocksDB.loadLibrary
+ final String tmpDir =
System.getenv("ROCKSDB_SHAREDLIB_DIR");
+ // explicitly load the JNI dependency if it has not
been loaded before
+ NativeLibraryLoader.getInstance().loadLibrary(tmpDir);
+
+ // this initialization here should validate that the
loading succeeded
+ RocksDB.loadLibrary();
+
+ // seems to have worked
+ LOG.info("Successfully loaded RocksDB native library");
+ rocksDbInitialized = true;
+ return;
+ } catch (Throwable t) {
+ lastException = t;
+ LOG.debug("RocksDB JNI library loading attempt {}
failed", attempt, t);
+ // try to force RocksDB to attempt reloading the
library
+ try {
+ resetRocksDBLoadedFlag();
+ } catch (Throwable tt) {
+ LOG.debug(
+ "Failed to reset 'initialized' flag in
RocksDB native code loader",
+ tt);
+ }
+ }
+ }
+ throw new IOException("Could not load the native RocksDB
library", lastException);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ static void resetRocksDBLoadedFlag() throws Exception {
+ final Field initField =
+
org.rocksdb.NativeLibraryLoader.class.getDeclaredField("initialized");
+ initField.setAccessible(true);
+ initField.setBoolean(null, false);
+ }
+
+ public static boolean rocksDbInitialized() {
+ return rocksDbInitialized;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactoryTest.java
b/paimon-core/src/test/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactoryTest.java
new file mode 100644
index 0000000000..823f77232b
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactoryTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.rocksdb;
+
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.RocksDB;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.Path;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.anyString;
+
+/** Test for {@link RocksDBStateFactory}. */
+public class RocksDBStateFactoryTest {
+
+ @TempDir Path tempDir;
+
+ @Test
+ void testRocksDBLibraryLoadSuccess() throws Exception {
+ // Test that RocksDBStateFactory can be created successfully when
RocksDB library loads
+ // properly
+ RocksDBStateFactory factory =
+ new RocksDBStateFactory(tempDir.toString(), new Options(),
null);
+
+ // Verify that the factory is created and can be used
+ assertThat(factory).isNotNull();
+ assertThat(factory.db()).isNotNull();
+ assertThat(factory.path()).isEqualTo(tempDir.toString());
+
+ factory.close();
+ }
+
+ @Test
+ void testEnsureRocksDBIsLoadedSuccess() throws Exception {
+ // Reset the initialization flag first
+ resetRocksDBInitializedFlag();
+
+ // Test successful loading
+ RocksDBStateFactory.ensureRocksDBIsLoaded();
+
+ // Verify that the flag is set to true after successful loading
+ assertThat(RocksDBStateFactory.rocksDbInitialized()).isTrue();
+ }
+
+ @Test
+ void testEnsureRocksDBIsLoadedFailureAfterRetries() throws Exception {
+ // Reset the initialization flag first
+ resetRocksDBInitializedFlag();
+
+ try (MockedStatic<RocksDB> mockedRocksDB =
Mockito.mockStatic(RocksDB.class);
+ MockedStatic<NativeLibraryLoader> mockedNativeLoader =
+ Mockito.mockStatic(NativeLibraryLoader.class)) {
+
+ // Mock NativeLibraryLoader.getInstance()
+ NativeLibraryLoader mockLoader =
Mockito.mock(NativeLibraryLoader.class);
+
mockedNativeLoader.when(NativeLibraryLoader::getInstance).thenReturn(mockLoader);
+
+ // Make both NativeLibraryLoader.loadLibrary and
RocksDB.loadLibrary fail
+ Mockito.doThrow(new RuntimeException("Native library loading
failed"))
+ .when(mockLoader)
+ .loadLibrary(anyString());
+ mockedRocksDB
+ .when(RocksDB::loadLibrary)
+ .thenThrow(new RuntimeException("RocksDB library loading
failed"));
+
+ // Should throw IOException after all retry attempts fail
+ assertThatThrownBy(RocksDBStateFactory::ensureRocksDBIsLoaded)
+ .isInstanceOf(IOException.class)
+ .hasMessage("Could not load the native RocksDB library");
+
+ // Verify that the flag remains false after failure
+ assertThat(RocksDBStateFactory.rocksDbInitialized()).isFalse();
+
+ assertThatThrownBy(
+ () -> new RocksDBStateFactory(tempDir.toString(),
new Options(), null))
+ .isInstanceOf(IOException.class)
+ .hasMessage("Could not load the native RocksDB library");
+ }
+ }
+
+ @Test
+ void testEnsureRocksDBIsLoadedAlreadyInitialized() throws Exception {
+ // Set the initialization flag to true first
+ setRocksDBInitializedFlag(true);
+
+ try (MockedStatic<RocksDB> mockedRocksDB =
Mockito.mockStatic(RocksDB.class)) {
+ // Should not attempt to load library if already initialized
+ RocksDBStateFactory.ensureRocksDBIsLoaded();
+
+ // Verify that RocksDB.loadLibrary was never called
+ mockedRocksDB.verify(Mockito.never(), () -> RocksDB.loadLibrary());
+ }
+ }
+
+ @Test
+ void testEnsureRocksDBIsLoadedWithDifferentExceptionTypes() throws
Exception {
+ // Reset the initialization flag first
+ resetRocksDBInitializedFlag();
+
+ try (MockedStatic<RocksDB> mockedRocksDB =
Mockito.mockStatic(RocksDB.class);
+ MockedStatic<NativeLibraryLoader> mockedNativeLoader =
+ Mockito.mockStatic(NativeLibraryLoader.class)) {
+
+ // Mock NativeLibraryLoader.getInstance()
+ NativeLibraryLoader mockLoader =
Mockito.mock(NativeLibraryLoader.class);
+
mockedNativeLoader.when(NativeLibraryLoader::getInstance).thenReturn(mockLoader);
+
+ // Make NativeLibraryLoader.loadLibrary fail with
UnsatisfiedLinkError
+ Mockito.doThrow(new UnsatisfiedLinkError("Native library not
found"))
+ .when(mockLoader)
+ .loadLibrary(anyString());
+
+ // Make RocksDB.loadLibrary also fail
+ mockedRocksDB
+ .when(RocksDB::loadLibrary)
+ .thenThrow(new UnsatisfiedLinkError("RocksDB library not
found"));
+
+ // Should throw IOException with the correct message
+ assertThatThrownBy(RocksDBStateFactory::ensureRocksDBIsLoaded)
+ .isInstanceOf(IOException.class)
+ .hasMessage("Could not load the native RocksDB library")
+ .hasCauseInstanceOf(UnsatisfiedLinkError.class);
+ }
+ }
+
+ @Test
+ void testResetRocksDBLoadedFlag() throws Exception {
+ // Test the resetRocksDBLoadedFlag method
+ RocksDBStateFactory.resetRocksDBLoadedFlag();
+ }
+
+ // Helper methods for accessing private static fields
+ private void setRocksDBInitializedFlag(boolean value) throws Exception {
+ Field field =
RocksDBStateFactory.class.getDeclaredField("rocksDbInitialized");
+ field.setAccessible(true);
+ field.setBoolean(null, value);
+ }
+
+ private void resetRocksDBInitializedFlag() throws Exception {
+ setRocksDBInitializedFlag(false);
+ }
+}