This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 658fac3 [FLINK-23840][runtime][logging] Clarify logging around CheckpointStorage 658fac3 is described below commit 658fac3736b73adf54b629242ede91313947e7e1 Author: David Moravek <d...@apache.org> AuthorDate: Tue Aug 24 10:05:56 2021 +0200 [FLINK-23840][runtime][logging] Clarify logging around CheckpointStorage --- .../runtime/state/CheckpointStorageLoader.java | 48 ++++++++++++++++------ .../state/memory/MemCheckpointStreamFactory.java | 9 ++-- .../runtime/state/CheckpointStorageLoaderTest.java | 31 +++++++------- 3 files changed, 54 insertions(+), 34 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java index f1e90a0..044d230 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java @@ -44,6 +44,9 @@ public class CheckpointStorageLoader { private static final String FILE_SYSTEM_STORAGE_NAME = "filesystem"; + private static final String LEGACY_PRECEDENCE_LOG_MESSAGE = + "Legacy state backends can also be used as checkpoint storage and take precedence for backward-compatibility reasons."; + /** * Loads the checkpoint storage from the configuration, from the parameter * 'state.checkpoint-storage', as defined in {@link CheckpointingOptions#CHECKPOINT_STORAGE}. @@ -187,27 +190,46 @@ public class CheckpointStorageLoader { logger.info( "Using legacy state backend {} as Job checkpoint storage", rootStateBackend); + if (fromApplication != null) { + logger.warn( + "Checkpoint storage passed via StreamExecutionEnvironment is ignored because legacy state backend '{}' is used. {}", + rootStateBackend.getClass().getName(), + LEGACY_PRECEDENCE_LOG_MESSAGE); + } + if (config.get(CheckpointingOptions.CHECKPOINT_STORAGE) != null) { + logger.warn( + "Config option '{}' is ignored because legacy state backend '{}' is used. {}", + CheckpointingOptions.CHECKPOINT_STORAGE.key(), + rootStateBackend.getClass().getName(), + LEGACY_PRECEDENCE_LOG_MESSAGE); + } } - return (CheckpointStorage) rootStateBackend; - } else if (fromApplication instanceof ConfigurableCheckpointStorage) { - if (logger != null) { - logger.info( - "Using job/cluster config to configure application-defined checkpoint storage: {}", - fromApplication); - } + } - return ((ConfigurableCheckpointStorage) fromApplication).configure(config, classLoader); - } else if (fromApplication != null) { + if (fromApplication != null) { + if (fromApplication instanceof ConfigurableCheckpointStorage) { + if (logger != null) { + logger.info( + "Using job/cluster config to configure application-defined checkpoint storage: {}", + fromApplication); + if (config.get(CheckpointingOptions.CHECKPOINT_STORAGE) != null) { + logger.warn( + "Config option '{}' is ignored because the checkpoint storage passed via StreamExecutionEnvironment takes precedence.", + CheckpointingOptions.CHECKPOINT_STORAGE.key()); + } + } + return ((ConfigurableCheckpointStorage) fromApplication) + .configure(config, classLoader); + } if (logger != null) { logger.info("Using application defined checkpoint storage: {}", fromApplication); } - return fromApplication; - } else { - return fromConfig(config, classLoader, logger) - .orElseGet(() -> createDefaultCheckpointStorage(config, classLoader, logger)); } + + return fromConfig(config, classLoader, logger) + .orElseGet(() -> createDefaultCheckpointStorage(config, classLoader, logger)); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java index 9bef67b..5f36a42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java @@ -22,6 +22,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import javax.annotation.Nullable; @@ -59,11 +60,9 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { static void checkSize(int size, int maxSize) throws IOException { if (size > maxSize) { throw new IOException( - "Size of the state is larger than the maximum permitted memory-backed state. Size=" - + size - + " , maxSize=" - + maxSize - + " . Consider using a different state backend, like the File System State backend."); + String.format( + "Size of the state is larger than the maximum permitted memory-backed state. Size=%d, maxSize=%d. Consider using a different checkpoint storage, like the %s.", + size, maxSize, FileSystemCheckpointStorage.class.getSimpleName())); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoaderTest.java index fb0e4fd..e4aa5c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoaderTest.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.DynamicCodeLoadingException; +import org.apache.flink.util.TestLogger; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -50,7 +51,7 @@ import java.io.IOException; import java.util.Collection; /** This test validates that checkpoint storage is properly loaded from configuration. */ -public class CheckpointStorageLoaderTest { +public class CheckpointStorageLoaderTest extends TestLogger { @Rule public final TemporaryFolder tmp = new TemporaryFolder(); @@ -68,7 +69,7 @@ public class CheckpointStorageLoaderTest { CheckpointStorage storage = new MockStorage(); CheckpointStorage configured = - CheckpointStorageLoader.load(storage, null, legacy, new Configuration(), cl, null); + CheckpointStorageLoader.load(storage, null, legacy, new Configuration(), cl, log); Assert.assertEquals( "Legacy state backends should always take precedence", legacy, configured); @@ -80,7 +81,7 @@ public class CheckpointStorageLoaderTest { CheckpointStorage storage = new MockStorage(); CheckpointStorage configured = - CheckpointStorageLoader.load(storage, null, modern, new Configuration(), cl, null); + CheckpointStorageLoader.load(storage, null, modern, new Configuration(), cl, log); Assert.assertEquals( "Modern state backends should never take precedence", storage, configured); @@ -92,8 +93,7 @@ public class CheckpointStorageLoaderTest { config.set(CheckpointingOptions.CHECKPOINT_STORAGE, WorkingFactory.class.getName()); CheckpointStorage storage = - CheckpointStorageLoader.load( - null, null, new ModernStateBackend(), config, cl, null); + CheckpointStorageLoader.load(null, null, new ModernStateBackend(), config, cl, log); Assert.assertThat(storage, Matchers.instanceOf(MockStorage.class)); } @@ -101,7 +101,7 @@ public class CheckpointStorageLoaderTest { public void testDefaultCheckpointStorage() throws Exception { CheckpointStorage storage1 = CheckpointStorageLoader.load( - null, null, new ModernStateBackend(), new Configuration(), cl, null); + null, null, new ModernStateBackend(), new Configuration(), cl, log); Assert.assertThat(storage1, Matchers.instanceOf(JobManagerCheckpointStorage.class)); @@ -109,8 +109,7 @@ public class CheckpointStorageLoaderTest { Configuration config = new Configuration(); config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); CheckpointStorage storage2 = - CheckpointStorageLoader.load( - null, null, new ModernStateBackend(), config, cl, null); + CheckpointStorageLoader.load(null, null, new ModernStateBackend(), config, cl, log); Assert.assertThat(storage2, Matchers.instanceOf(FileSystemCheckpointStorage.class)); } @@ -121,7 +120,7 @@ public class CheckpointStorageLoaderTest { config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "does.not.exist"); try { - CheckpointStorageLoader.load(null, null, new ModernStateBackend(), config, cl, null); + CheckpointStorageLoader.load(null, null, new ModernStateBackend(), config, cl, log); Assert.fail("should fail with exception"); } catch (DynamicCodeLoadingException e) { // expected @@ -130,7 +129,7 @@ public class CheckpointStorageLoaderTest { // try a class that is not a factory config.set(CheckpointingOptions.CHECKPOINT_STORAGE, java.io.File.class.getName()); try { - CheckpointStorageLoader.load(null, null, new ModernStateBackend(), config, cl, null); + CheckpointStorageLoader.load(null, null, new ModernStateBackend(), config, cl, log); Assert.fail("should fail with exception"); } catch (DynamicCodeLoadingException e) { // expected @@ -139,7 +138,7 @@ public class CheckpointStorageLoaderTest { // try a factory that fails config.set(CheckpointingOptions.CHECKPOINT_STORAGE, FailingFactory.class.getName()); try { - CheckpointStorageLoader.load(null, null, new ModernStateBackend(), config, cl, null); + CheckpointStorageLoader.load(null, null, new ModernStateBackend(), config, cl, log); Assert.fail("should fail with exception"); } catch (IllegalConfigurationException e) { // expected @@ -213,7 +212,7 @@ public class CheckpointStorageLoaderTest { new ModernStateBackend(), config, cl, - null); + log); Assert.assertThat(storage, Matchers.instanceOf(JobManagerCheckpointStorage.class)); JobManagerCheckpointStorage jmStorage = (JobManagerCheckpointStorage) storage; @@ -238,7 +237,7 @@ public class CheckpointStorageLoaderTest { new ModernStateBackend(), config, cl, - null); + log); Assert.assertThat(storage, Matchers.instanceOf(JobManagerCheckpointStorage.class)); JobManagerCheckpointStorage jmStorage = (JobManagerCheckpointStorage) storage; @@ -320,7 +319,7 @@ public class CheckpointStorageLoaderTest { final CheckpointStorage loadedStorage = CheckpointStorageLoader.load( - storage, null, new ModernStateBackend(), config, cl, null); + storage, null, new ModernStateBackend(), config, cl, log); Assert.assertThat(loadedStorage, Matchers.instanceOf(FileSystemCheckpointStorage.class)); final FileSystemCheckpointStorage fs = (FileSystemCheckpointStorage) loadedStorage; @@ -383,10 +382,10 @@ public class CheckpointStorageLoaderTest { final CheckpointStorage loaded1 = CheckpointStorageLoader.load( - storage, null, new ModernStateBackend(), config1, cl, null); + storage, null, new ModernStateBackend(), config1, cl, log); final CheckpointStorage loaded2 = CheckpointStorageLoader.load( - null, null, new ModernStateBackend(), config2, cl, null); + null, null, new ModernStateBackend(), config2, cl, log); Assert.assertThat(loaded1, Matchers.instanceOf(JobManagerCheckpointStorage.class)); Assert.assertThat(loaded2, Matchers.instanceOf(JobManagerCheckpointStorage.class));