This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new e94f4de  [FLINK-24552][tests] Moved randomization of buffer debloat 
from StreamEnvironment to MiniClusterResource
e94f4de is described below

commit e94f4de24b47c8b231ceb209b096e24dcfe2524e
Author: Anton Kalashnikov <kaa....@yandex.ru>
AuthorDate: Wed Oct 27 17:48:19 2021 +0200

    [FLINK-24552][tests] Moved randomization of buffer debloat from 
StreamEnvironment to MiniClusterResource
    
    The reason we moved it is that it is not possible to configure it via 
StreamEnvironment.
    
    This closes #17581
---
 .../runtime/testutils/MiniClusterResource.java     | 25 +++++++-
 .../streaming/util/TestStreamEnvironment.java      | 70 ++++++++++++----------
 2 files changed, 63 insertions(+), 32 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
index d61a666..c6799f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
@@ -45,8 +45,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.runtime.testutils.PseudoRandomValueSelector.randomize;
+
 /** Resource which starts a {@link MiniCluster} for testing purposes. */
 public class MiniClusterResource extends ExternalResource {
+    private static final boolean RANDOMIZE_BUFFER_DEBLOAT_CONFIG =
+            
Boolean.parseBoolean(System.getProperty("buffer-debloat.randomization", 
"false"));
 
     private static final MemorySize DEFAULT_MANAGED_MEMORY_SIZE = 
MemorySize.parse("80m");
 
@@ -105,7 +109,8 @@ public class MiniClusterResource extends ExternalResource {
                                             .toMilliseconds()));
 
             final List<CompletableFuture<Acknowledge>> jobCancellationFutures =
-                    miniCluster.listJobs()
+                    miniCluster
+                            .listJobs()
                             .get(
                                     
jobCancellationDeadline.timeLeft().toMillis(),
                                     TimeUnit.MILLISECONDS)
@@ -120,7 +125,8 @@ public class MiniClusterResource extends ExternalResource {
             CommonTestUtils.waitUntilCondition(
                     () -> {
                         final long unfinishedJobs =
-                                miniCluster.listJobs()
+                                miniCluster
+                                        .listJobs()
                                         .get(
                                                 
jobCancellationDeadline.timeLeft().toMillis(),
                                                 TimeUnit.MILLISECONDS)
@@ -186,6 +192,8 @@ public class MiniClusterResource extends ExternalResource {
         configuration.setInteger(JobManagerOptions.PORT, 0);
         configuration.setString(RestOptions.BIND_PORT, "0");
 
+        randomizeConfiguration(configuration);
+
         final MiniClusterConfiguration miniClusterConfiguration =
                 new MiniClusterConfiguration.Builder()
                         .setConfiguration(configuration)
@@ -206,6 +214,19 @@ public class MiniClusterResource extends ExternalResource {
         createClientConfiguration(restAddress);
     }
 
+    /**
+     * This is the place for randomization the configuration that relates to 
task execution such as
+     * TaskManagerConf. Configurations which relates to streaming should be 
randomized in
+     * TestStreamEnvironment#randomizeConfiguration.
+     */
+    private static void randomizeConfiguration(Configuration configuration) {
+        // randomize ITTests for enabling buffer de-bloating
+        if (RANDOMIZE_BUFFER_DEBLOAT_CONFIG
+                && 
!configuration.contains(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED)) {
+            randomize(configuration, 
TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true, false);
+        }
+    }
+
     private void createClientConfiguration(URI restAddress) {
         Configuration restClientConfig = new Configuration();
         restClientConfig.setString(JobManagerOptions.ADDRESS, 
restAddress.getHost());
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 8df47f0..be095f8 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.util;
 
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
@@ -45,8 +45,6 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
             
Boolean.parseBoolean(System.getProperty("checkpointing.randomization", 
"false"));
     private static final String STATE_CHANGE_LOG_CONFIG =
             System.getProperty("checkpointing.changelog", 
STATE_CHANGE_LOG_CONFIG_UNSET).trim();
-    private static final boolean RANDOMIZE_BUFFER_DEBLOAT_CONFIG =
-            
Boolean.parseBoolean(System.getProperty("buffer-debloat.randomization", 
"false"));
 
     public TestStreamEnvironment(
             MiniCluster miniCluster,
@@ -86,33 +84,9 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
                     TestStreamEnvironment env =
                             new TestStreamEnvironment(
                                     miniCluster, parallelism, jarFiles, 
classpaths);
-                    if (RANDOMIZE_CHECKPOINTING_CONFIG) {
-                        randomize(
-                                conf, 
ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true, false);
-                        randomize(
-                                conf,
-                                
ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT,
-                                Duration.ofSeconds(0),
-                                Duration.ofMillis(100),
-                                Duration.ofSeconds(2));
-                    }
-                    if 
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
-                        if 
(isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
-                            
conf.set(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true);
-                        }
-                    } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(
-                            STATE_CHANGE_LOG_CONFIG_RAND)) {
-                        if 
(isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
-                            randomize(
-                                    conf,
-                                    
CheckpointingOptions.ENABLE_STATE_CHANGE_LOG,
-                                    true,
-                                    false);
-                        }
-                    }
-                    if (RANDOMIZE_BUFFER_DEBLOAT_CONFIG) {
-                        randomize(conf, 
TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true, false);
-                    }
+
+                    randomizeConfiguration(miniCluster, conf);
+
                     env.configure(conf, env.getUserClassloader());
                     return env;
                 };
@@ -120,6 +94,42 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
         initializeContextEnvironment(factory);
     }
 
+    /**
+     * This is the place for randomization the configuration that relates to 
DataStream API such as
+     * ExecutionConf, CheckpointConf, StreamExecutionEnvironment. List of the 
configurations can be
+     * found here {@link StreamExecutionEnvironment#configure(ReadableConfig, 
ClassLoader)}. All
+     * other configuration should be randomized here {@link
+     * 
org.apache.flink.runtime.testutils.MiniClusterResource#randomizeConfiguration(Configuration)}.
+     */
+    private static void randomizeConfiguration(MiniCluster miniCluster, 
Configuration conf) {
+        // randomize ITTests for enabling unaligned checkpoint
+        if (RANDOMIZE_CHECKPOINTING_CONFIG) {
+            randomize(conf, ExecutionCheckpointingOptions.ENABLE_UNALIGNED, 
true, false);
+            randomize(
+                    conf,
+                    ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT,
+                    Duration.ofSeconds(0),
+                    Duration.ofMillis(100),
+                    Duration.ofSeconds(2));
+        }
+
+        // randomize ITTests for enabling state change log
+        if 
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
+            if 
(isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
+                conf.set(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true);
+            }
+        } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(
+                STATE_CHANGE_LOG_CONFIG_RAND)) {
+            if 
(isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
+                randomize(
+                        conf,
+                        CheckpointingOptions.ENABLE_STATE_CHANGE_LOG,
+                        true,
+                        false);
+            }
+        }
+    }
+
     private static boolean isConfigurationSupportedByChangelog(Configuration 
configuration) {
         return !configuration.get(LOCAL_RECOVERY);
     }

Reply via email to