This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 61007c9  [FLINK-22369][rocksdb] RocksDB state backend might occur 
ClassNotFoundException when deserializing on TM side
61007c9 is described below

commit 61007c9ff7d1619bbf0b2c1f72bfb3c39aca505d
Author: Seth Wiesman <sjwies...@gmail.com>
AuthorDate: Tue Apr 20 08:48:32 2021 -0500

    [FLINK-22369][rocksdb] RocksDB state backend might occur 
ClassNotFoundException when deserializing on TM side
---
 .../state/EmbeddedRocksDBStateBackend.java         | 42 ++++++++--------------
 .../streaming/state/RocksDBStateBackend.java       |  3 +-
 .../contrib/streaming/state/RocksDBInitTest.java   |  2 +-
 3 files changed, 16 insertions(+), 31 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index c2651f9..d7dc578 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -112,8 +111,6 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
 
     private static final long UNDEFINED_WRITE_BATCH_SIZE = -1;
 
-    private Logger logger = LOG;
-
     // ------------------------------------------------------------------------
 
     // -- configuration values, set in the application / configuration
@@ -260,7 +257,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                 original.predefinedOptions == null
                         ? 
PredefinedOptions.valueOf(config.get(RocksDBOptions.PREDEFINED_OPTIONS))
                         : original.predefinedOptions;
-        logger.info("Using predefined options: {}.", predefinedOptions.name());
+        LOG.info("Using predefined options: {}.", predefinedOptions.name());
 
         // configure RocksDB options factory
         try {
@@ -278,16 +275,6 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
         latencyTrackingConfigBuilder = 
original.latencyTrackingConfigBuilder.configure(config);
     }
 
-    /**
-     * Overrides the default logger for this class. It ensures users of the 
legacy {@link
-     * RocksDBStateBackend} see consistent logging.
-     */
-    @Internal
-    @SuppressWarnings("SameParameterValue")
-    void setLogger(Logger logger) {
-        this.logger = logger;
-    }
-
     // ------------------------------------------------------------------------
     //  Reconfiguration
     // ------------------------------------------------------------------------
@@ -334,7 +321,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                             "Local DB files directory '"
                                     + f
                                     + "' does not exist and cannot be created. 
";
-                    logger.error(msg);
+                    LOG.error(msg);
                     errorMessage.append(msg);
                 } else {
                     dirs.add(f);
@@ -415,7 +402,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
         // first, make sure that the RocksDB JNI library is loaded
         // we do this explicitly here to have better error handling
         String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
-        ensureRocksDBIsLoaded(tempDir, logger);
+        ensureRocksDBIsLoaded(tempDir);
 
         // replace all characters that are not legal for filenames with 
underscore
         String fileCompatibleIdentifier = 
operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_");
@@ -437,10 +424,9 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
 
         final OpaqueMemoryResource<RocksDBSharedResources> sharedResources =
                 RocksDBOperationUtils.allocateSharedCachesIfConfigured(
-                        memoryConfiguration, env.getMemoryManager(), 
managedMemoryFraction, logger);
+                        memoryConfiguration, env.getMemoryManager(), 
managedMemoryFraction, LOG);
         if (sharedResources != null) {
-            logger.info(
-                    "Obtained shared RocksDB cache of size {} bytes", 
sharedResources.getSize());
+            LOG.info("Obtained shared RocksDB cache of size {} bytes", 
sharedResources.getSize());
         }
         final RocksDBResourceContainer resourceContainer =
                 createOptionsAndResourceContainer(sharedResources);
@@ -512,7 +498,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                         ((ConfigurableRocksDBOptionsFactory) 
originalOptionsFactory)
                                 .configure(config);
             }
-            logger.info("Using application-defined options factory: {}.", 
originalOptionsFactory);
+            LOG.info("Using application-defined options factory: {}.", 
originalOptionsFactory);
 
             return originalOptionsFactory;
         }
@@ -523,7 +509,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
             DefaultConfigurableOptionsFactory optionsFactory =
                     new DefaultConfigurableOptionsFactory();
             optionsFactory.configure(config);
-            logger.info("Using default options factory: {}.", optionsFactory);
+            LOG.info("Using default options factory: {}.", optionsFactory);
 
             return optionsFactory;
         } else {
@@ -537,7 +523,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                     optionsFactory =
                             ((ConfigurableRocksDBOptionsFactory) 
optionsFactory).configure(config);
                 }
-                logger.info("Using configured options factory: {}.", 
optionsFactory);
+                LOG.info("Using configured options factory: {}.", 
optionsFactory);
 
                 return optionsFactory;
             } catch (ClassNotFoundException e) {
@@ -834,12 +820,12 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
     // ------------------------------------------------------------------------
 
     @VisibleForTesting
-    static void ensureRocksDBIsLoaded(String tempDirectory, Logger logger) 
throws IOException {
+    static void ensureRocksDBIsLoaded(String tempDirectory) throws IOException 
{
         synchronized (EmbeddedRocksDBStateBackend.class) {
             if (!rocksDbInitialized) {
 
                 final File tempDirParent = new 
File(tempDirectory).getAbsoluteFile();
-                logger.info(
+                LOG.info(
                         "Attempting to load RocksDB native library and store 
it under '{}'",
                         tempDirParent);
 
@@ -863,7 +849,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                         rocksLibFolder = new File(tempDirParent, 
"rocksdb-lib-" + new AbstractID());
 
                         // make sure the temp path exists
-                        logger.debug(
+                        LOG.debug(
                                 "Attempting to create RocksDB native library 
folder {}",
                                 rocksLibFolder);
                         // noinspection ResultOfMethodCallIgnored
@@ -877,18 +863,18 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                         RocksDB.loadLibrary();
 
                         // seems to have worked
-                        logger.info("Successfully loaded RocksDB native 
library");
+                        LOG.info("Successfully loaded RocksDB native library");
                         rocksDbInitialized = true;
                         return;
                     } catch (Throwable t) {
                         lastException = t;
-                        logger.debug("RocksDB JNI library loading attempt {} 
failed", attempt, t);
+                        LOG.debug("RocksDB JNI library loading attempt {} 
failed", attempt, t);
 
                         // try to force RocksDB to attempt reloading the 
library
                         try {
                             resetRocksDBLoadedFlag();
                         } catch (Throwable tt) {
-                            logger.debug(
+                            LOG.debug(
                                     "Failed to reset 'initialized' flag in 
RocksDB native code loader",
                                     tt);
                         }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index f630ee2..3b0a71d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -216,7 +216,6 @@ public class RocksDBStateBackend extends 
AbstractManagedMemoryStateBackend
         }
         this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
         this.rocksDBStateBackend = new 
EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing);
-        this.rocksDBStateBackend.setLogger(LOG);
     }
 
     /** @deprecated Use {@link #RocksDBStateBackend(StateBackend)} instead. */
@@ -588,7 +587,7 @@ public class RocksDBStateBackend extends 
AbstractManagedMemoryStateBackend
 
     @VisibleForTesting
     static void ensureRocksDBIsLoaded(String tempDirectory) throws IOException 
{
-        EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempDirectory, LOG);
+        EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempDirectory);
     }
 
     @VisibleForTesting
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitTest.java
index ff68a37..578507a 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitTest.java
@@ -62,7 +62,7 @@ public class RocksDBInitTest {
 
         File tempFolder = temporaryFolder.newFolder();
         try {
-            
EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempFolder.getAbsolutePath(), 
LOG);
+            
EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempFolder.getAbsolutePath());
             fail("Not throwing expected exception.");
         } catch (IOException ignored) {
             // ignored

Reply via email to