This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new e94f4de [FLINK-24552][tests] Moved randomization of buffer debloat from StreamEnvironment to MiniClusterResource e94f4de is described below commit e94f4de24b47c8b231ceb209b096e24dcfe2524e Author: Anton Kalashnikov <kaa....@yandex.ru> AuthorDate: Wed Oct 27 17:48:19 2021 +0200 [FLINK-24552][tests] Moved randomization of buffer debloat from StreamEnvironment to MiniClusterResource The reason we moved it is that it is not possible to configure it via StreamEnvironment. This closes #17581 --- .../runtime/testutils/MiniClusterResource.java | 25 +++++++- .../streaming/util/TestStreamEnvironment.java | 70 ++++++++++++---------- 2 files changed, 63 insertions(+), 32 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java index d61a666..c6799f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java @@ -45,8 +45,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.flink.runtime.testutils.PseudoRandomValueSelector.randomize; + /** Resource which starts a {@link MiniCluster} for testing purposes. */ public class MiniClusterResource extends ExternalResource { + private static final boolean RANDOMIZE_BUFFER_DEBLOAT_CONFIG = + Boolean.parseBoolean(System.getProperty("buffer-debloat.randomization", "false")); private static final MemorySize DEFAULT_MANAGED_MEMORY_SIZE = MemorySize.parse("80m"); @@ -105,7 +109,8 @@ public class MiniClusterResource extends ExternalResource { .toMilliseconds())); final List<CompletableFuture<Acknowledge>> jobCancellationFutures = - miniCluster.listJobs() + miniCluster + .listJobs() .get( jobCancellationDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS) @@ -120,7 +125,8 @@ public class MiniClusterResource extends ExternalResource { CommonTestUtils.waitUntilCondition( () -> { final long unfinishedJobs = - miniCluster.listJobs() + miniCluster + .listJobs() .get( jobCancellationDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS) @@ -186,6 +192,8 @@ public class MiniClusterResource extends ExternalResource { configuration.setInteger(JobManagerOptions.PORT, 0); configuration.setString(RestOptions.BIND_PORT, "0"); + randomizeConfiguration(configuration); + final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) @@ -206,6 +214,19 @@ public class MiniClusterResource extends ExternalResource { createClientConfiguration(restAddress); } + /** + * This is the place for randomization the configuration that relates to task execution such as + * TaskManagerConf. Configurations which relates to streaming should be randomized in + * TestStreamEnvironment#randomizeConfiguration. + */ + private static void randomizeConfiguration(Configuration configuration) { + // randomize ITTests for enabling buffer de-bloating + if (RANDOMIZE_BUFFER_DEBLOAT_CONFIG + && !configuration.contains(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED)) { + randomize(configuration, TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true, false); + } + } + private void createClientConfiguration(URI restAddress) { Configuration restClientConfig = new Configuration(); restClientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost()); diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index 8df47f0..be095f8 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.util; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; @@ -45,8 +45,6 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { Boolean.parseBoolean(System.getProperty("checkpointing.randomization", "false")); private static final String STATE_CHANGE_LOG_CONFIG = System.getProperty("checkpointing.changelog", STATE_CHANGE_LOG_CONFIG_UNSET).trim(); - private static final boolean RANDOMIZE_BUFFER_DEBLOAT_CONFIG = - Boolean.parseBoolean(System.getProperty("buffer-debloat.randomization", "false")); public TestStreamEnvironment( MiniCluster miniCluster, @@ -86,33 +84,9 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { TestStreamEnvironment env = new TestStreamEnvironment( miniCluster, parallelism, jarFiles, classpaths); - if (RANDOMIZE_CHECKPOINTING_CONFIG) { - randomize( - conf, ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true, false); - randomize( - conf, - ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT, - Duration.ofSeconds(0), - Duration.ofMillis(100), - Duration.ofSeconds(2)); - } - if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) { - if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) { - conf.set(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true); - } - } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase( - STATE_CHANGE_LOG_CONFIG_RAND)) { - if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) { - randomize( - conf, - CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, - true, - false); - } - } - if (RANDOMIZE_BUFFER_DEBLOAT_CONFIG) { - randomize(conf, TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true, false); - } + + randomizeConfiguration(miniCluster, conf); + env.configure(conf, env.getUserClassloader()); return env; }; @@ -120,6 +94,42 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { initializeContextEnvironment(factory); } + /** + * This is the place for randomization the configuration that relates to DataStream API such as + * ExecutionConf, CheckpointConf, StreamExecutionEnvironment. List of the configurations can be + * found here {@link StreamExecutionEnvironment#configure(ReadableConfig, ClassLoader)}. All + * other configuration should be randomized here {@link + * org.apache.flink.runtime.testutils.MiniClusterResource#randomizeConfiguration(Configuration)}. + */ + private static void randomizeConfiguration(MiniCluster miniCluster, Configuration conf) { + // randomize ITTests for enabling unaligned checkpoint + if (RANDOMIZE_CHECKPOINTING_CONFIG) { + randomize(conf, ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true, false); + randomize( + conf, + ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT, + Duration.ofSeconds(0), + Duration.ofMillis(100), + Duration.ofSeconds(2)); + } + + // randomize ITTests for enabling state change log + if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) { + if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) { + conf.set(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true); + } + } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase( + STATE_CHANGE_LOG_CONFIG_RAND)) { + if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) { + randomize( + conf, + CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, + true, + false); + } + } + } + private static boolean isConfigurationSupportedByChangelog(Configuration configuration) { return !configuration.get(LOCAL_RECOVERY); }