This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 658fac3  [FLINK-23840][runtime][logging] Clarify logging around 
CheckpointStorage
658fac3 is described below

commit 658fac3736b73adf54b629242ede91313947e7e1
Author: David Moravek <d...@apache.org>
AuthorDate: Tue Aug 24 10:05:56 2021 +0200

    [FLINK-23840][runtime][logging] Clarify logging around CheckpointStorage
---
 .../runtime/state/CheckpointStorageLoader.java     | 48 ++++++++++++++++------
 .../state/memory/MemCheckpointStreamFactory.java   |  9 ++--
 .../runtime/state/CheckpointStorageLoaderTest.java | 31 +++++++-------
 3 files changed, 54 insertions(+), 34 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
index f1e90a0..044d230 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
@@ -44,6 +44,9 @@ public class CheckpointStorageLoader {
 
     private static final String FILE_SYSTEM_STORAGE_NAME = "filesystem";
 
+    private static final String LEGACY_PRECEDENCE_LOG_MESSAGE =
+            "Legacy state backends can also be used as checkpoint storage and 
take precedence for backward-compatibility reasons.";
+
     /**
      * Loads the checkpoint storage from the configuration, from the parameter
      * 'state.checkpoint-storage', as defined in {@link 
CheckpointingOptions#CHECKPOINT_STORAGE}.
@@ -187,27 +190,46 @@ public class CheckpointStorageLoader {
                 logger.info(
                         "Using legacy state backend {} as Job checkpoint 
storage",
                         rootStateBackend);
+                if (fromApplication != null) {
+                    logger.warn(
+                            "Checkpoint storage passed via 
StreamExecutionEnvironment is ignored because legacy state backend '{}' is 
used. {}",
+                            rootStateBackend.getClass().getName(),
+                            LEGACY_PRECEDENCE_LOG_MESSAGE);
+                }
+                if (config.get(CheckpointingOptions.CHECKPOINT_STORAGE) != 
null) {
+                    logger.warn(
+                            "Config option '{}' is ignored because legacy 
state backend '{}' is used. {}",
+                            CheckpointingOptions.CHECKPOINT_STORAGE.key(),
+                            rootStateBackend.getClass().getName(),
+                            LEGACY_PRECEDENCE_LOG_MESSAGE);
+                }
             }
-
             return (CheckpointStorage) rootStateBackend;
-        } else if (fromApplication instanceof ConfigurableCheckpointStorage) {
-            if (logger != null) {
-                logger.info(
-                        "Using job/cluster config to configure 
application-defined checkpoint storage: {}",
-                        fromApplication);
-            }
+        }
 
-            return ((ConfigurableCheckpointStorage) 
fromApplication).configure(config, classLoader);
-        } else if (fromApplication != null) {
+        if (fromApplication != null) {
+            if (fromApplication instanceof ConfigurableCheckpointStorage) {
+                if (logger != null) {
+                    logger.info(
+                            "Using job/cluster config to configure 
application-defined checkpoint storage: {}",
+                            fromApplication);
+                    if (config.get(CheckpointingOptions.CHECKPOINT_STORAGE) != 
null) {
+                        logger.warn(
+                                "Config option '{}' is ignored because the 
checkpoint storage passed via StreamExecutionEnvironment takes precedence.",
+                                CheckpointingOptions.CHECKPOINT_STORAGE.key());
+                    }
+                }
+                return ((ConfigurableCheckpointStorage) fromApplication)
+                        .configure(config, classLoader);
+            }
             if (logger != null) {
                 logger.info("Using application defined checkpoint storage: 
{}", fromApplication);
             }
-
             return fromApplication;
-        } else {
-            return fromConfig(config, classLoader, logger)
-                    .orElseGet(() -> createDefaultCheckpointStorage(config, 
classLoader, logger));
         }
+
+        return fromConfig(config, classLoader, logger)
+                .orElseGet(() -> createDefaultCheckpointStorage(config, 
classLoader, logger));
     }
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 9bef67b..5f36a42 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 
 import javax.annotation.Nullable;
 
@@ -59,11 +60,9 @@ public class MemCheckpointStreamFactory implements 
CheckpointStreamFactory {
     static void checkSize(int size, int maxSize) throws IOException {
         if (size > maxSize) {
             throw new IOException(
-                    "Size of the state is larger than the maximum permitted 
memory-backed state. Size="
-                            + size
-                            + " , maxSize="
-                            + maxSize
-                            + " . Consider using a different state backend, 
like the File System State backend.");
+                    String.format(
+                            "Size of the state is larger than the maximum 
permitted memory-backed state. Size=%d, maxSize=%d. Consider using a different 
checkpoint storage, like the %s.",
+                            size, maxSize, 
FileSystemCheckpointStorage.class.getSimpleName()));
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoaderTest.java
index fb0e4fd..e4aa5c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoaderTest.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
@@ -50,7 +51,7 @@ import java.io.IOException;
 import java.util.Collection;
 
 /** This test validates that checkpoint storage is properly loaded from 
configuration. */
-public class CheckpointStorageLoaderTest {
+public class CheckpointStorageLoaderTest extends TestLogger {
 
     @Rule public final TemporaryFolder tmp = new TemporaryFolder();
 
@@ -68,7 +69,7 @@ public class CheckpointStorageLoaderTest {
         CheckpointStorage storage = new MockStorage();
 
         CheckpointStorage configured =
-                CheckpointStorageLoader.load(storage, null, legacy, new 
Configuration(), cl, null);
+                CheckpointStorageLoader.load(storage, null, legacy, new 
Configuration(), cl, log);
 
         Assert.assertEquals(
                 "Legacy state backends should always take precedence", legacy, 
configured);
@@ -80,7 +81,7 @@ public class CheckpointStorageLoaderTest {
         CheckpointStorage storage = new MockStorage();
 
         CheckpointStorage configured =
-                CheckpointStorageLoader.load(storage, null, modern, new 
Configuration(), cl, null);
+                CheckpointStorageLoader.load(storage, null, modern, new 
Configuration(), cl, log);
 
         Assert.assertEquals(
                 "Modern state backends should never take precedence", storage, 
configured);
@@ -92,8 +93,7 @@ public class CheckpointStorageLoaderTest {
 
         config.set(CheckpointingOptions.CHECKPOINT_STORAGE, 
WorkingFactory.class.getName());
         CheckpointStorage storage =
-                CheckpointStorageLoader.load(
-                        null, null, new ModernStateBackend(), config, cl, 
null);
+                CheckpointStorageLoader.load(null, null, new 
ModernStateBackend(), config, cl, log);
         Assert.assertThat(storage, Matchers.instanceOf(MockStorage.class));
     }
 
@@ -101,7 +101,7 @@ public class CheckpointStorageLoaderTest {
     public void testDefaultCheckpointStorage() throws Exception {
         CheckpointStorage storage1 =
                 CheckpointStorageLoader.load(
-                        null, null, new ModernStateBackend(), new 
Configuration(), cl, null);
+                        null, null, new ModernStateBackend(), new 
Configuration(), cl, log);
 
         Assert.assertThat(storage1, 
Matchers.instanceOf(JobManagerCheckpointStorage.class));
 
@@ -109,8 +109,7 @@ public class CheckpointStorageLoaderTest {
         Configuration config = new Configuration();
         config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
         CheckpointStorage storage2 =
-                CheckpointStorageLoader.load(
-                        null, null, new ModernStateBackend(), config, cl, 
null);
+                CheckpointStorageLoader.load(null, null, new 
ModernStateBackend(), config, cl, log);
 
         Assert.assertThat(storage2, 
Matchers.instanceOf(FileSystemCheckpointStorage.class));
     }
@@ -121,7 +120,7 @@ public class CheckpointStorageLoaderTest {
 
         config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "does.not.exist");
         try {
-            CheckpointStorageLoader.load(null, null, new ModernStateBackend(), 
config, cl, null);
+            CheckpointStorageLoader.load(null, null, new ModernStateBackend(), 
config, cl, log);
             Assert.fail("should fail with exception");
         } catch (DynamicCodeLoadingException e) {
             // expected
@@ -130,7 +129,7 @@ public class CheckpointStorageLoaderTest {
         // try a class that is not a factory
         config.set(CheckpointingOptions.CHECKPOINT_STORAGE, 
java.io.File.class.getName());
         try {
-            CheckpointStorageLoader.load(null, null, new ModernStateBackend(), 
config, cl, null);
+            CheckpointStorageLoader.load(null, null, new ModernStateBackend(), 
config, cl, log);
             Assert.fail("should fail with exception");
         } catch (DynamicCodeLoadingException e) {
             // expected
@@ -139,7 +138,7 @@ public class CheckpointStorageLoaderTest {
         // try a factory that fails
         config.set(CheckpointingOptions.CHECKPOINT_STORAGE, 
FailingFactory.class.getName());
         try {
-            CheckpointStorageLoader.load(null, null, new ModernStateBackend(), 
config, cl, null);
+            CheckpointStorageLoader.load(null, null, new ModernStateBackend(), 
config, cl, log);
             Assert.fail("should fail with exception");
         } catch (IllegalConfigurationException e) {
             // expected
@@ -213,7 +212,7 @@ public class CheckpointStorageLoaderTest {
                         new ModernStateBackend(),
                         config,
                         cl,
-                        null);
+                        log);
 
         Assert.assertThat(storage, 
Matchers.instanceOf(JobManagerCheckpointStorage.class));
         JobManagerCheckpointStorage jmStorage = (JobManagerCheckpointStorage) 
storage;
@@ -238,7 +237,7 @@ public class CheckpointStorageLoaderTest {
                         new ModernStateBackend(),
                         config,
                         cl,
-                        null);
+                        log);
 
         Assert.assertThat(storage, 
Matchers.instanceOf(JobManagerCheckpointStorage.class));
         JobManagerCheckpointStorage jmStorage = (JobManagerCheckpointStorage) 
storage;
@@ -320,7 +319,7 @@ public class CheckpointStorageLoaderTest {
 
         final CheckpointStorage loadedStorage =
                 CheckpointStorageLoader.load(
-                        storage, null, new ModernStateBackend(), config, cl, 
null);
+                        storage, null, new ModernStateBackend(), config, cl, 
log);
         Assert.assertThat(loadedStorage, 
Matchers.instanceOf(FileSystemCheckpointStorage.class));
 
         final FileSystemCheckpointStorage fs = (FileSystemCheckpointStorage) 
loadedStorage;
@@ -383,10 +382,10 @@ public class CheckpointStorageLoaderTest {
 
         final CheckpointStorage loaded1 =
                 CheckpointStorageLoader.load(
-                        storage, null, new ModernStateBackend(), config1, cl, 
null);
+                        storage, null, new ModernStateBackend(), config1, cl, 
log);
         final CheckpointStorage loaded2 =
                 CheckpointStorageLoader.load(
-                        null, null, new ModernStateBackend(), config2, cl, 
null);
+                        null, null, new ModernStateBackend(), config2, cl, 
log);
 
         Assert.assertThat(loaded1, 
Matchers.instanceOf(JobManagerCheckpointStorage.class));
         Assert.assertThat(loaded2, 
Matchers.instanceOf(JobManagerCheckpointStorage.class));

Reply via email to